In [47]:
import sqlglot
from IPython.display import display, Markdown

In [66]:
import sys
from IPython.display import display, Markdown

def display_ansi_formatted_text(ansi_text, html_font_size="30"):
    if 'ipykernel' in sys.modules:
        md_text = ansi_text.replace("\x1b[4m", "**").replace("\x1b[0m", "**")
        display(Markdown(md_text))
    else:
        # Directly print the ANSI text in console environments
        print(ansi_text)

# Example usage with a string from sqlglot
ansi_string = "SELECT baz FROM \x1b[4mt\x1b[0m"
display_ansi_formatted_text(ansi_string)


SELECT baz FROM **t**

In [48]:
def clean_sql_simple(sql):
    return sqlglot.transpile(sql, read="duckdb", write="duckdb")[0]

In [49]:
def check_sql(sql):
    try:
        return sqlglot.transpile(sql, read="duckdb", write="duckdb")[0]
    except sqlglot.errors.ParseError as e:
        print(f"SQL ERROR: {e.errors[0]['description']}\nQuery: '{sql[:e.errors[0]['col']]}'")
        #print(e.errors)
        return None

In [50]:
test_sql_with_error = "SELECT foo FROM (SELECT baz FROM t"

In [51]:
check_sql(test_sql_with_error)

SQL ERROR: Expecting )
Query: 'SELECT foo FROM (SELECT baz FROM t'


In [52]:
formatting_sql = "selECT   * from Tablename"

In [53]:
check_sql(formatting_sql)

'SELECT * FROM Tablename'

In [54]:
def validate_sql(sql):
    try:
        expressions = sqlglot.parse(sql, dialect="duckdb")
        print(f"Expressions: {expressions}")
        if len(expressions) > 1:
            raise ValueError("Only one statement is allowed")
        disallowed_keywords = [
            "INSERT",
            "UPDATE",
            "DELETE",
            "DROP",
            "EXEC",
            "CALL",
            "ALTER",
            "GRANT",
        ]
        for expression in expressions:
            if any(token in expression.sql().upper() for token in disallowed_keywords):
                raise ValueError("Disallowed SQL keywords detected")
        return True, "Query is valid"
    except Exception as e:
        return False, str(e)

In [55]:
validate_sql(formatting_sql)

Expressions: [Select(
  expressions=[
    Star()],
  from=From(
    this=Table(
      this=Identifier(this=Tablename, quoted=False))))]


(True, 'Query is valid')

In [56]:
validate_sql(test_sql_with_error)

(False,
 'Expecting ). Line 1, Col: 34.\n  SELECT foo FROM (SELECT baz FROM \x1b[4mt\x1b[0m')

In [71]:
def check_and_validate_sql(sql):
    validate = validate_sql(sql)
    if validate[0]:
        return check_sql(sql)
    else:
        return display_ansi_formatted_text(f"#### ERROR: {validate[1]}")
        #return check_sql(sql)

In [72]:
check_and_validate_sql(formatting_sql)

Expressions: [Select(
  expressions=[
    Star()],
  from=From(
    this=Table(
      this=Identifier(this=Tablename, quoted=False))))]


'SELECT * FROM Tablename'

In [73]:
check_and_validate_sql(test_sql_with_error)

#### ERROR: Expecting ). Line 1, Col: 34.
  SELECT foo FROM (SELECT baz FROM **t**

In [102]:
import tomllib
from pathlib import Path
from loguru import logger

def read_config(parent_dir=0):
    config_toml = Path.cwd().parents[parent_dir] / "app" / "app.toml"
    if not config_toml.exists():
        logger.error(f"File Not Found - {str(config_toml)}")
        raise FileNotFoundError(f"{str(config_toml)}")
    with open(config_toml, 'rb') as f:
        logger.info(f"Config loaded from {str(config_toml)}")
        return tomllib.load(f)


In [104]:
config = read_config()

[32m2024-04-22 15:04:28.534[0m | [1mINFO    [0m | [36m__main__[0m:[36mread_config[0m:[36m11[0m - [1mConfig loaded from /Users/mjboothaus/code/github/mjboothaus/sql-8week-danny/app/app.toml[0m


In [98]:
config['app']

{'title': 'Data with Danny - Week 1',
 'create_sql': 'week1.sql',
 'prompt_name': 'User',
 'response_name': 'Result',
 'tabs': ['Query', 'SQL', 'Scehma']}

In [87]:
config['sql']['disallowed_keywords']

['ALTER', 'CALL', 'DELETE', 'DROP', 'EXEC', 'GRANT', 'INSERT', 'UPDATE']