In [1]:
import sqlite3, os, json
from sentence_transformers import SentenceTransformer, util

In [2]:
folder_path = "src/spider/database"
select_db = ['musical',
             'farm', 
             'hospital_1', 
             'tvshow', 
             'cinema', 
             'restaurants', 
             'company_employee', 
             'company_1', 
             'company_offic', 
             'singer', 
             'coffee_shop']

db = []

if os.path.exists(folder_path) and os.path.isdir(folder_path):
    files = os.listdir(folder_path)
    for file in files:
        if file in select_db:
            db_path = os.path.join(folder_path, file)
            sqlite_db = [os.path.join(db_path, sql) for sql in os.listdir(db_path) if ".sqlite" in sql]
            db.append(*sqlite_db)

In [3]:
# db

['spider/database/musical/musical.sqlite',
 'spider/database/farm/farm.sqlite',
 'spider/database/hospital_1/hospital_1.sqlite',
 'spider/database/tvshow/tvshow.sqlite',
 'spider/database/cinema/cinema.sqlite',
 'spider/database/restaurants/restaurants.sqlite',
 'spider/database/company_employee/company_employee.sqlite',
 'spider/database/company_1/company_1.sqlite',
 'spider/database/coffee_shop/coffee_shop.sqlite',
 'spider/database/singer/singer.sqlite']

In [4]:
def get_schema(sqlite_db):
    connection = sqlite3.connect(sqlite_db)
    cursor = connection.cursor()

    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()

    for table in tables:
        table_name = table[0]
        print(f"Table: {table_name}")

        cursor.execute(f"PRAGMA table_info({table_name});")
        columns = cursor.fetchall()

        for column in columns:
            column_name = column[1]
            print(f"  Column: {column_name}")

        print()
    
    cursor.close()
    connection.close()

In [8]:
# for table in db:
#     get_schema(table)
#     print('---------------------------------')

In [2]:
src_folder = "src"
schema_description_file = "mockup_schema_description.json"
with open(os.path.join(src_folder, schema_description_file)) as f:
    dbs = json.load(f)
model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')

In [56]:
# description_emb = []

# for db in dbs:
#     schema_emb = {}
#     table_name = db['table']
#     table_description = db['description']
#     schema_emb[table_name] = model.encode(table_description).tolist()
#     columns = list(db['columns'].keys())
#     for col in columns:
#         column_description = db['columns'][col]
#         schema_emb[col] = model.encode(column_description).tolist()
#     description_emb.append(schema_emb)

# schema_vector_file = "mockup_schema_description_vector.json"
# with open(os.path.join(src_folder, schema_vector_file), "w") as f:
#     json.dump(description_emb,f)

In [12]:
schema_vector_file = "mockup_schema_description_vector.json"
with open(os.path.join(src_folder, schema_vector_file)) as f:
    schema_vector = json.load(f)

In [63]:
# filter_table : filter by table before 
def filter_tables(question, column_threshold = 0.4, table_threshold = 0.2, filter_tables = True):
    question_emb = model.encode(question)
    used_schema = {}
    for i in range(len(schema_vector)):
        table_name = list(schema_vector[i].keys())[0]

        table_description_vector = schema_vector[i][table_name]
        if filter_tables and util.cos_sim(table_description_vector, question_emb) < table_threshold: continue
        
        used_col = []
        for col, vec in schema_vector[i].items():
            if col == table_name: continue
            score = float(util.cos_sim(vec, question_emb))
            if score > column_threshold:
                column_description = [dbs[i]['columns'][col] for i in range(len(dbs)) if dbs[i]['table'] == table_name][0]
                print(f"{table_name} - {col} : {score:.2f}\nDescription : {column_description}\n")
                used_col.append(col)
        if len(used_col) > 0: used_schema[table_name] = used_col
    return used_schema

In [64]:
question = "Count singers who born at French"

In [68]:
filter_tables(question, column_threshold = 0.3, filter_tables = True)

people - Nationality : 0.35
Description : Nationality of the person

singer - Singer_ID : 0.39
Description : Unique identifier for the singer

singer - Name : 0.45
Description : Name of the singer

singer - Birth_Year : 0.54
Description : Year of birth of the singer

singer - Net_Worth_Millions : 0.39
Description : Net worth of the singer in millions

singer - Citizenship : 0.45
Description : Citizenship of the singer

song - Singer_ID : 0.42
Description : Identifier of the singer who performed the song



{'people': ['Nationality'],
 'singer': ['Singer_ID',
  'Name',
  'Birth_Year',
  'Net_Worth_Millions',
  'Citizenship'],
 'song': ['Singer_ID']}

In [78]:
model.tokenizer('hello my name is Thanawat')['input_ids']

[101, 7592, 2026, 2171, 2003, 2084, 10830, 2102, 102]

In [79]:
test_text = """SCB TechX, an SCBX company specializing in digital technology, has introduced PointX, the latest platform development. 
It is a new world that lets reward point collectors experience unlimited reward point accumulation and redemption. 
With a concept of all-in-one reward point wallet platform, customers can use reward points like cash for purchases at any shops displaying the PointX logo, 
get discount coupons for shopping for flash deals and special priced items via an in-app X Store, transfer and share reward points, 
and enjoy using reward points at bonus rates every day. The PointX application is starting pilot services for Siam Commercial Bank (SCB) credit cardholders with reward point programs before rolling out to SCB’s other customer segments and other business partners in the future."""

In [81]:
from transformers import ElectraTokenizer, ElectraForPreTraining
tokenizer = ElectraTokenizer.from_pretrained("models/electra-large-discriminator")

In [83]:
tokenizer.tokenize(test_text)[:10]

['sc', '##b', 'tech', '##x', ',', 'an', 'sc', '##b', '##x', 'company']

In [98]:
def table_of_column(column):
    db = []
    for i in range(len(schema_vector)):
        schema = {}
        table_name = dbs[i]['table']
        columns_name = list(dbs[i]['columns'].keys())
        schema[table_name] = columns_name
        db.append(schema)
    return db

In [197]:
# string mathing , max_score = weight of string match
def column_from_question(question, default_score=0.6):
    used_table_col = {}
    # question_tokens = [token.lower() for token in tokenizer.tokenize(question)]
    question_tokens = [token.lower() for token in question.split()]
    print(question_tokens)
    for table in schema_vector:
        max_score = default_score
        for token in question_tokens:
            cols = [ key.lower() for key in table.keys()]
            table_name = cols.pop(0)
            if token == table_name: 
                max_score = 1.0
                if used_table_col.get(token): used_table_col = {key: max_score for key in used_table_col[token]}
            if token in cols: 
                used_table_col.setdefault(table_name, {}).update({token : max_score})

    return used_table_col

In [198]:
column_from_question("How many singer have singer_id ")

['how', 'many', 'singer', 'have', 'singer_id']


{'singer': {'singer_id': 1.0}, 'song': {'singer_id': 0.6}}

In [94]:
list(dbs[0]['columns'].keys())

['Musical_ID', 'Name', 'Year', 'Award', 'Category', 'Nominee', 'Result']

In [113]:
cols = list(schema_vector[0].keys())
print(cols)
t = cols.pop(0)

['musical', 'Musical_ID', 'Name', 'Year', 'Award', 'Category', 'Nominee', 'Result']


In [118]:
cols

['Musical_ID', 'Name', 'Year', 'Award', 'Category', 'Nominee', 'Result']

In [164]:
my_dict = {'new_key': ['ab']}  # Create an empty dictionary
key = 'new_key'

my_dict.setdefault(key, [])  # Update the dictionary with the key-value pair
my_dict

{'new_key': ['ab']}