In [1]:
import os
import openai
import warnings
from langchain import OpenAI, SQLDatabase
from langchain.sql_database import SQLDatabase
from langchain_experimental.sql import SQLDatabaseChain
import toml
import os 
import ast


warnings.filterwarnings("ignore")

In [2]:
config = toml.load("config.toml")
api_key = config["openai"]["api_key"]
os.environ["OPENAI_API_KEY"] = api_key
openai.api_key = os.getenv("OPENAI_API_KEY")

In [3]:
sqlite_db_path = "Spotify.db"
db = SQLDatabase.from_uri(f"sqlite:///{sqlite_db_path}")

In [4]:
llm = OpenAI(temperature=0)

In [5]:
db_chain = SQLDatabaseChain.from_llm(llm, db, verbose=True)

In [7]:
result = db_chain.run("How many kinds of musics are there?")



[1m> Entering new SQLDatabaseChain chain...[0m
How many kinds of musics are there?
SQLQuery:[32;1m[1;3mSELECT COUNT(DISTINCT track_genre) FROM musics[0m
SQLResult: [33;1m[1;3m[(114,)][0m
Answer:[32;1m[1;3mThere are 114 kinds of musics.[0m
[1m> Finished chain.[0m


In [8]:
result

'There are 114 kinds of musics.'

In [10]:
db_chain = SQLDatabaseChain.from_llm(llm, db, verbose=True, use_query_checker=True)
result = db_chain.run("How many salary are there?")




[1m> Entering new SQLDatabaseChain chain...[0m
How many salary are there?
SQLQuery:[32;1m[1;3mSELECT COUNT(*) FROM musics[0m
SQLResult: [33;1m[1;3m[(114000,)][0m
Answer:[32;1m[1;3m114000[0m
[1m> Finished chain.[0m


In [5]:
from langchain.prompts.prompt import PromptTemplate

_DEFAULT_TEMPLATE = """Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer.
Use the following format:

Question: "Question here"
SQLQuery: "SQL Query to run"
SQLResult: "Result of the SQLQuery"
Answer: "Final answer here"

Only use the following tables:
{table_info}

If table not exist return "null"

Question: {input}"""
PROMPT = PromptTemplate(
    input_variables=["input", "table_info", "dialect"], template=_DEFAULT_TEMPLATE)

In [8]:
db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT, verbose=True, use_query_checker=True)
result = db_chain.run("kitap tablosunda kaç farklı sanatçı var ?")




[1m> Entering new SQLDatabaseChain chain...[0m
 kitap tablosunda kaç farklı sanatçı var ?
SQLQuery:[32;1m[1;3mSELECT COUNT(DISTINCT artists) FROM musics[0m
SQLResult: [33;1m[1;3m[(31437,)][0m
Answer:[32;1m[1;3m31437[0m
[1m> Finished chain.[0m


In [26]:
PROMPT = PromptTemplate(
    input_variables=["input", "table_info", "dialect"], template=_DEFAULT_TEMPLATE)


In [14]:
def table_exists(table_name, db):
    # Mevcut tablo adlarını al
    existing_tables = db.run("SELECT name FROM sqlite_master WHERE type='table';")
    table_list = ast.literal_eval(existing_tables)
    # Tablo adı listede mi kontrol et
    table_names = [table[0] for table in table_list]
    print("Table names list:", table_names)
    
    # Tam eşleşme kontrolü
    return table_name in table_names

def column_exists(table_name, column_name, db):
    # Mevcut tablo adlarını kontrol et
    if not table_exists(table_name, db):
        return False
    
    # Belirtilen tablonun kolon adlarını al
    column_info = db.run(f"PRAGMA table_info({table_name});")
    print(f"Column info for table {table_name}:", column_info)
    
    # Gelen string'i değerlendirerek tuple listesini oluşturma
    column_list = ast.literal_eval(column_info)
    
    # Kolon adı listede mi kontrol et
    column_names = [column[1] for column in column_list]  # İkinci indeks kolon adını tutar
    print("Column names list:", column_names)
    
    # Tam eşleşme kontrolü
    return column_name in column_names

db_chain = SQLDatabaseChain.from_llm(llm, db, prompt=PROMPT, verbose=True, use_query_checker=True)


In [16]:
db_chain

SQLDatabaseChain(verbose=True, llm_chain=LLMChain(prompt=PromptTemplate(input_variables=['dialect', 'input', 'table_info'], template='Given an input question, first create a syntactically correct {dialect} query to run, then look at the results of the query and return the answer.\nUse the following format:\n\nQuestion: "Question here"\nSQLQuery: "SQL Query to run"\nSQLResult: "Result of the SQLQuery"\nAnswer: "Final answer here"\n\nOnly use the following tables:\n{table_info}\n\nIf table not exist return "null"\n\nQuestion: {input}'), llm=OpenAI(client=<openai.resources.completions.Completions object at 0x7fe0201c04f0>, async_client=<openai.resources.completions.AsyncCompletions object at 0x7fdffaa09460>, temperature=0.0, openai_api_key='sk-proj-fNrYR9Otfk3LoNAhjW8UT3BlbkFJxHayU6zlR9FCVhhHW1z3', openai_proxy='')), database=<langchain_community.utilities.sql_database.SQLDatabase object at 0x7fdffaa98700>, use_query_checker=True)

In [None]:
# Sorguyu çalıştırma fonksiyonu
def run_query_with_check(query, db_chain, db):
    # Sorudan tablo adını çıkar
    print(query)
    import re
    table_name = re.search(r"FROM (\w+)", query)
    print("table name",table_name)
    if table_name:
        table_name = table_name.group(1)
    else:
        return "Invalid query format."

    # Tablo var mı kontrol et
    if not table_exists(table_name, db):
        return "Wrong table name"
    
    # Tablo varsa sorguyu çalıştır
    return db_chain.run(query)


In [15]:
query = "kaç farklı sanatçı var ?"
result = run_query_with_check(query, db_chain, db)
result

kaç farklı sanatçı var ?
table name None


'Invalid query format.'