In [2]:
import sqlite3
import langchain
import langchain_community
from langchain_community.utilities import SQLDatabase

In [3]:
db_path = r"./POC-LangChain/chinook-database-master/ChinookDatabase/DataSources/Chinook_Sqlite.sqlite"
db = SQLDatabase.from_uri(f"sqlite:///{db_path}")

In [4]:
import subprocess

# Führe den Befehl aus und erfasse die Ausgabe als Text
result = subprocess.run(["ollama", "list"], capture_output=True, text=True)
output = result.stdout.strip()

print(output)  # Debugging line

lines = output.splitlines()
if not lines:
    print("Keine Ausgabe erhalten.")
    exit()

NAME                     	ID          	SIZE  	MODIFIED     
qwen2.5-coder:7b         	2b0496514337	4.7 GB	7 days ago  	
deepseek-r1:8b           	28f8fd6cdc67	4.9 GB	8 days ago  	
llama3.2:1b-instruct-q4_0	53f2745c8077	770 MB	3 months ago	
llama3.2:1b              	baf6a787fdff	1.3 GB	3 months ago	
llama3.1:8b              	42182419e950	4.7 GB	4 months ago	
mistral:instruct         	f974a74358d6	4.1 GB	4 months ago


In [31]:
model_names = []
for line in lines[1:]:
    parts = line.split()
    if parts:
        model_names.append(parts[0])

print("Model names:", model_names)

Model names: ['qwen2.5-coder:7b', 'deepseek-r1:8b', 'llama3.2:1b-instruct-q4_0', 'llama3.2:1b', 'llama3.1:8b', 'mistral:instruct']


In [6]:
db_dialect = db.dialect
print(db.dialect)
db.get_usable_table_names()

sqlite


['Album',
 'Artist',
 'Customer',
 'Employee',
 'Genre',
 'Invoice',
 'InvoiceLine',
 'MediaType',
 'Playlist',
 'PlaylistTrack',
 'Track']

In [7]:
model_name = model_names[0] # Modell festlegen
model_name

'qwen2.5-coder:7b'

In [8]:
from langchain_ollama import ChatOllama

llm = ChatOllama(
    model=model_name,
    temperature=0,
)
print(f'{model_name} ausgewählt und geladen.')

qwen2.5-coder:7b ausgewählt und geladen.


In [9]:
import sqlite3

def extract_schema(db_path, sample_rows=2, verbose=True):
    """
    Extrahiert das Schema aller Tabellen in der SQLite-Datenbank und gibt
    – abhängig vom Parameter verbose – entweder die Ergebnisse aus oder liefert sie als Dictionary zurück.
    
    Parameter:
      - db_path: Pfad zur Datenbank.
      - sample_rows: Anzahl der Beispielzeilen, die pro Tabelle abgerufen werden.
      - verbose: Falls True, werden die Informationen direkt ausgegeben.
      
    Rückgabe:
      Ein Dictionary, in dem für jede Tabelle die folgenden Informationen enthalten sind:
        - columns: Liste der Spalten (inkl. Typ und Kennzeichnung als PK)
        - foreign_keys: Liste der Fremdschlüssel (Format: "Spalte -> referenzierteTabelle.referenzierteSpalte")
        - sample_data: Abgerufene Beispieldaten (als Liste von Zeilen)
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    
    # Alle Tabellennamen abrufen
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    table_names = [row[0] for row in cursor.fetchall()]
    
    schema = {}
    
    for table in table_names:
        if verbose:
            print(f"Schema for Table '{table}':")
        
        # Spalteninformationen abrufen
        cursor.execute(f"PRAGMA table_info({table});")
        columns_info = cursor.fetchall()  
        columns = []
        for col in columns_info:
            col_name = col[1]
            col_type = col[2]
            is_pk = " (PK)" if col[5] != 0 else ""
            columns.append(f"{col_name} {col_type}{is_pk}")
        if verbose:
            print("Columns:", ", ".join(columns))
        
        # Fremdschlüsselinformationen abrufen
        cursor.execute(f"PRAGMA foreign_key_list({table});")
        fk_info = cursor.fetchall()  
        foreign_keys = []
        for fk in fk_info:
            foreign_keys.append(f"{fk[3]} -> {fk[2]}.{fk[4]}")
        if verbose:
            if foreign_keys:
                print("Foreign Keys:", ", ".join(foreign_keys))
            else:
                print("Foreign Keys: none")
        
        # Beispiel-Daten abrufen
        cursor.execute(f"SELECT * FROM {table} LIMIT {sample_rows};")
        sample_data = cursor.fetchall()
        if verbose:
            print("Sample data:", sample_data)
            print("-" * 80)
        
        schema[table] = {
            "columns": columns,
            "foreign_keys": foreign_keys,
            "sample_data": sample_data
        }
    
    conn.close()
    return schema


In [11]:
schema = extract_schema(db_path, sample_rows=1, verbose=False)

In [12]:
schema_info_str = str(schema)

Prompt-Template erstellen

In [13]:
from langchain import PromptTemplate

# Define the prompt template in English
prompt_template = PromptTemplate(
    input_variables=["schema_info", "question","db_dialect"],
    template="""
You are a helpful SQL assistant that provides only SELECT-Statements.
Based on the following database schema, translate a natural language query into an SQL query that is executable in my {db_dialect}-Database.

Database Schema:
{schema_info}

Using the schema information above, please formulate the appropriate SQL query this question:
Question: {question}

SQL Query:
""".strip()
)

In [21]:
question = "In which city is the customer with the highest customer ID located?"

In [20]:
prompt = prompt_template.format(
    schema_info=schema_info_str, 
    question=question, 
    db_dialect=db_dialect
    )


Ab hier mit ChatGPT

In [33]:
from langchain.chains import LLMChain
from typing_extensions import TypedDict
import re

# Define your State class
class State(TypedDict):
    schema_info: str       # Loaded information about the DB schema
    question: str          # Original natural language question
    db_dialect: str        # SQL dialect (e.g., "sqlite")
    parsed_question: str   # (Optional) Parsed or analyzed question
    query: str             # Generated SQL query
    execution_result: str  # Result from executing the SQL query
    answer: str            # Final answer formatted for the user
    error: str             # Error message (if any)

In [23]:
# Function to extract SQL statement from text
def extract_sql_statement(text: str) -> str:
    """
    Extracts the SQL statement from a given text, starting from "SELECT" 
    until the first semicolon ";".
    """
    pattern = r'(SELECT.*?;)'
    match = re.search(pattern, text, re.IGNORECASE | re.DOTALL)
    if match:
        return match.group(1).strip()
    return ""

In [32]:
chain = LLMChain(llm=llm, prompt=prompt_template)

In [34]:
output = chain.invoke({
            "schema_info": state["schema_info"],
            "question": state["question"],
            "db_dialect": state["db_dialect"]
        })

In [35]:
output

{'schema_info': "{'Album': {'columns': ['AlbumId INTEGER (PK)', 'Title NVARCHAR(160)', 'ArtistId INTEGER'], 'foreign_keys': ['ArtistId -> Artist.ArtistId'], 'sample_data': [(1, 'For Those About To Rock We Salute You', 1)]}, 'Artist': {'columns': ['ArtistId INTEGER (PK)', 'Name NVARCHAR(120)'], 'foreign_keys': [], 'sample_data': [(1, 'AC/DC')]}, 'Customer': {'columns': ['CustomerId INTEGER (PK)', 'FirstName NVARCHAR(40)', 'LastName NVARCHAR(20)', 'Company NVARCHAR(80)', 'Address NVARCHAR(70)', 'City NVARCHAR(40)', 'State NVARCHAR(40)', 'Country NVARCHAR(40)', 'PostalCode NVARCHAR(10)', 'Phone NVARCHAR(24)', 'Fax NVARCHAR(24)', 'Email NVARCHAR(60)', 'SupportRepId INTEGER'], 'foreign_keys': ['SupportRepId -> Employee.EmployeeId'], 'sample_data': [(1, 'Luís', 'Gonçalves', 'Embraer - Empresa Brasileira de Aeronáutica S.A.', 'Av. Brigadeiro Faria Lima, 2170', 'São José dos Campos', 'SP', 'Brazil', '12227-000', '+55 (12) 3923-5555', '+55 (12) 3923-5566', 'luisg@embraer.com.br', 3)]}, 'Employe

In [28]:
# New helper function that uses chain.invoke and updates the State
def extract_sql_query_state(prompt_template, state: State) -> State:
    """
    Uses an LLMChain to generate an SQL query from the natural language question 
    contained in the provided state. Updates the state's 'query' field with the 
    extracted SQL and sets an 'error' field if exceptions occur.
    """
    chain = LLMChain(llm=llm, prompt=prompt_template)
    
    try:
        # Call chain.invoke with a dictionary as input
        output = chain.invoke({
            "schema_info": state["schema_info"],
            "question": state["question"],
            "db_dialect": state["db_dialect"]
        })
        
        # Optionally extract the SQL statement (if extra formatting is added)
        sql_query = extract_sql_statement(output)
        state["query"] = sql_query if sql_query else output
        state["error"] = ""
    except Exception as e:
        state["error"] = str(e)
    
    return state

# Example usage:

# Assuming you have created the prompt_template and have the following variables defined:
# schema_info_str, question, and db_dialect


In [29]:
# Create an initial state
state: State = {
    "schema_info": schema_info_str,
    "question": question,
    "db_dialect": db_dialect,
    "parsed_question": "",
    "query": "",
    "execution_result": "",
    "answer": "",
    "error": ""
}

In [30]:
# Invoke the chain to update the state with the generated SQL query
state = extract_sql_query_state(prompt_template, state)

print("Generated SQL Query:")
print(state["query"])
if state["error"]:
    print("Error encountered:", state["error"])


Generated SQL Query:

Error encountered: expected string or bytes-like object, got 'dict'


In [36]:
# Wenn output ein Dictionary ist und den Schlüssel "text" enthält, verwende dessen Inhalt
if isinstance(output, dict) and "text" in output:
    sql_query = extract_sql_statement(output["text"])
else:
    sql_query = extract_sql_statement(output)


In [37]:
def extract_sql_query_state(prompt_template, state: State) -> State:
    chain = LLMChain(llm=llm, prompt=prompt_template)
    
    try:
        output = chain.invoke({
            "schema_info": state["schema_info"],
            "question": state["question"],
            "db_dialect": state["db_dialect"]
        })
        
        # Extrahiere den SQL-Text aus dem Dictionary, falls vorhanden
        if isinstance(output, dict) and "text" in output:
            sql_query = extract_sql_statement(output["text"])
        else:
            sql_query = extract_sql_statement(output)
        
        state["query"] = sql_query if sql_query else output
        state["error"] = ""
    except Exception as e:
        state["error"] = str(e)
    
    return state


In [38]:
extract_sql_query_state(prompt_template, state)

{'schema_info': "{'Album': {'columns': ['AlbumId INTEGER (PK)', 'Title NVARCHAR(160)', 'ArtistId INTEGER'], 'foreign_keys': ['ArtistId -> Artist.ArtistId'], 'sample_data': [(1, 'For Those About To Rock We Salute You', 1)]}, 'Artist': {'columns': ['ArtistId INTEGER (PK)', 'Name NVARCHAR(120)'], 'foreign_keys': [], 'sample_data': [(1, 'AC/DC')]}, 'Customer': {'columns': ['CustomerId INTEGER (PK)', 'FirstName NVARCHAR(40)', 'LastName NVARCHAR(20)', 'Company NVARCHAR(80)', 'Address NVARCHAR(70)', 'City NVARCHAR(40)', 'State NVARCHAR(40)', 'Country NVARCHAR(40)', 'PostalCode NVARCHAR(10)', 'Phone NVARCHAR(24)', 'Fax NVARCHAR(24)', 'Email NVARCHAR(60)', 'SupportRepId INTEGER'], 'foreign_keys': ['SupportRepId -> Employee.EmployeeId'], 'sample_data': [(1, 'Luís', 'Gonçalves', 'Embraer - Empresa Brasileira de Aeronáutica S.A.', 'Av. Brigadeiro Faria Lima, 2170', 'São José dos Campos', 'SP', 'Brazil', '12227-000', '+55 (12) 3923-5555', '+55 (12) 3923-5566', 'luisg@embraer.com.br', 3)]}, 'Employe