From a786b6ec368ead7753a9d578bf23b59863534da3 Mon Sep 17 00:00:00 2001 From: Danil Date: Sun, 12 Mar 2023 15:11:10 +0300 Subject: [PATCH] Update querybuilder.py --- simple_query_builder/querybuilder.py | 293 +++++++++++++++++---------- 1 file changed, 185 insertions(+), 108 deletions(-) diff --git a/simple_query_builder/querybuilder.py b/simple_query_builder/querybuilder.py index 62d18ab..0c53983 100644 --- a/simple_query_builder/querybuilder.py +++ b/simple_query_builder/querybuilder.py @@ -4,10 +4,10 @@ :copyright: (c) 2022 co0lc0der """ +import inspect import sqlite3 -import traceback import sys -import inspect +import traceback from typing import Union @@ -21,12 +21,12 @@ def __call__(cls, *args, **kwargs): class DataBase(metaclass=MetaSingleton): - db_name = 'db.db' + db_name = "db.db" conn = None cursor = None - def connect(self, db_name=''): - if db_name != '': + def connect(self, db_name=""): + if db_name != "": self.db_name = db_name if self.conn is None: @@ -40,10 +40,21 @@ def c(self): class QueryBuilder: - _OPERATORS: list = ['=', '>', '<', '>=', '<=', '!=', 'LIKE', 'NOT LIKE', 'IN', 'NOT IN'] - _LOGICS: list = ['AND', 'OR', 'NOT'] - _SORT_TYPES: list = ['ASC', 'DESC'] - _JOIN_TYPES: list = ['INNER', 'LEFT OUTER', 'RIGHT OUTER', 'FULL OUTER', 'CROSS'] + _OPERATORS: list = [ + "=", + ">", + "<", + ">=", + "<=", + "!=", + "LIKE", + "NOT LIKE", + "IN", + "NOT IN", + ] + _LOGICS: list = ["AND", "OR", "NOT"] + _SORT_TYPES: list = ["ASC", "DESC"] + _JOIN_TYPES: list = ["INNER", "LEFT OUTER", "RIGHT OUTER", "FULL OUTER", "CROSS"] _NO_FETCH: int = 0 _FETCH_ONE: int = 1 _FETCH_ALL: int = 2 @@ -51,20 +62,22 @@ class QueryBuilder: _conn = None _cur = None _query = None - _sql: str = '' + _sql: str = "" _error: bool = False - _error_message: str = '' + _error_message: str = "" _result: Union[tuple, list] = [] _count: int = -1 _params: tuple = () - def __init__(self, database: DataBase, db_name='') -> None: + def __init__(self, database: DataBase, db_name="") -> None: self._conn = database.connect(db_name) # self._conn.row_factory = sqlite3.Row - # self._conn.row_factory = lambda c, r: dict([(col[0], r[idx]) for idx, col in enumerate(c.description)]) - self._cur = database.c() + self._conn.row_factory = lambda c, r: dict( + [(col[0], r[idx]) for idx, col in enumerate(c.description)] + ) + self._cur = self._conn.cursor() - def query(self, sql: str = '', params=(), fetch=2, column=0): + def query(self, sql: str = "", params=(), fetch=2, column=0): if fetch == 2: fetch = self._FETCH_ALL @@ -97,27 +110,35 @@ def query(self, sql: str = '', params=(), fetch=2, column=0): self.set_error() except sqlite3.Error as er: self._error = True - print('SQLite error: %s' % (' '.join(er.args))) + print("SQL: %s" % self._sql) + print("Params: %s" % str(self._params)) + print("SQLite error: %s" % (" ".join(er.args))) print("Exception class is: ", er.__class__) - print('SQLite traceback: ') + print("SQLite traceback: ") exc_type, exc_value, exc_tb = sys.exc_info() print(traceback.format_exception(exc_type, exc_value, exc_tb)) return self - def add_semicolon(self, sql: str = '') -> str: - new_sql = self._sql if sql == '' else sql + def add_semicolon(self, sql: str = "") -> str: + new_sql = self._sql if sql == "" else sql - if new_sql != '': - new_sql += ';' if new_sql[-1] != ';' else '' + if new_sql != "": + new_sql += ";" if new_sql[-1] != ";" else "" - if sql == '': + if sql == "": self._sql = new_sql return new_sql def get_sql(self) -> str: - return self._sql + # Replace ? with markers + sql = self._sql + params = self._params + if params: + for p in params: + sql = sql.replace("?", str(p), 1) + return sql def get_error(self) -> bool: return self._error @@ -125,8 +146,8 @@ def get_error(self) -> bool: def get_error_message(self) -> str: return self._error_message - def set_error(self, message: str = '') -> None: - self._error = message != '' + def set_error(self, message: str = "") -> None: + self._error = message != "" self._error_message = message def get_params(self) -> tuple: @@ -139,7 +160,7 @@ def get_count(self) -> int: return self._count def reset(self) -> bool: - self._sql = '' + self._sql = "" self._params = () self._query = None self._result = [] @@ -160,22 +181,22 @@ def go(self) -> Union[int, None]: return self._cur.lastrowid def column(self, column: int = 0) -> Union[tuple, list, dict, None]: - self.query('', (), self._FETCH_COLUMN, column) + self.query("", (), self._FETCH_COLUMN, column) return self._result def pluck(self, key: int = 0, column: int = 1) -> Union[tuple, list, dict, None]: self.query() return [(x[key], x[column]) for x in self._result] - def count(self, table: Union[str, dict], field: str = ''): - if table == '' or table == {}: + def count(self, table: Union[str, dict], field: str = ""): + if table == "" or table == {}: self.set_error(f"Empty table in {inspect.stack()[0][3]} method") return self - if field == '': + if field == "": self.select(table, "COUNT(*) AS `counter`") else: - field = field.replace('.', '`.`') + field = field.replace(".", "`.`") self.select(table, f"COUNT(`{field}`) AS `counter`") return self.one()[0] @@ -191,10 +212,12 @@ def exists(self) -> bool: result = self.one() return self._count > 0 - def _prepare_aliases(self, items: Union[str, list, dict], as_list: bool = False) -> Union[str, list]: - if items == '' or items == {} or items == []: + def _prepare_aliases( + self, items: Union[str, list, dict], as_list: bool = False + ) -> Union[str, list]: + if items == "" or items == {} or items == []: self.set_error(f"Empty items in {inspect.stack()[0][3]} method") - return '' + return "" sql = [] if isinstance(items, str): @@ -207,19 +230,27 @@ def _prepare_aliases(self, items: Union[str, list, dict], as_list: bool = False) elif isinstance(item, dict): first_item = list(item.values())[0] alias = list(item.keys())[0] - sql.append(f"{first_item}" if isinstance(alias, int) else f"{first_item} AS {alias}") + sql.append( + f"{first_item}" + if isinstance(alias, int) + else f"{first_item} AS {alias}" + ) elif isinstance(items, dict): new_item = items[item] - sql.append(f"{new_item}" if isinstance(item, int) else f"{new_item} AS {item}") + sql.append( + f"{new_item}" + if isinstance(item, int) + else f"{new_item} AS {item}" + ) else: self.set_error(f"Incorrect type of items in {inspect.stack()[0][3]} method") - return '' + return "" return self._prepare_fieldlist(sql) if not as_list else sql def _prepare_conditions(self, where: Union[str, list]) -> dict: - result = {'sql': '', 'values': []} - sql = '' + result = {"sql": "", "values": []} + sql = "" if not where: return result @@ -232,29 +263,38 @@ def _prepare_conditions(self, where: Union[str, list]) -> dict: if len(cond) == 2: field = self._prepare_field(cond[0]) value = cond[1] - if isinstance(value, list) or isinstance(value, tuple): - operator = 'IN' - values = ("?," * len(value)).rstrip(',') + + if value.lower() == "is null": + operator = "IS NULL" + sql += f"({field} {operator})" + elif value.lower() == "is not null": + operator = "IS NOT NULL" + sql += f"({field} {operator})" + elif isinstance(value, list) or isinstance(value, tuple): + operator = "IN" + values = ("?," * len(value)).rstrip(",") sql += f"({field} {operator} ({values}))" for item in value: - result['values'].append(item) + result["values"].append(item) else: - operator = '=' + operator = "=" sql += f"({field} {operator} ?)" - result['values'].append(value) + result["values"].append(value) elif len(cond) == 3: field = self._prepare_field(cond[0]) operator = cond[1].upper() value = cond[2] if operator in self._OPERATORS: - if operator == 'IN' and (isinstance(value, list) or isinstance(value, tuple)): - values = ("?," * len(value)).rstrip(',') + if operator == "IN" and ( + isinstance(value, list) or isinstance(value, tuple) + ): + values = ("?," * len(value)).rstrip(",") sql += f"({field} {operator} ({values}))" for item in value: - result['values'].append(item) + result["values"].append(item) else: sql += f"({field} {operator} ?)" - result['values'].append(value) + result["values"].append(value) elif isinstance(cond, str): upper = cond.upper() if upper in self._LOGICS: @@ -263,50 +303,58 @@ def _prepare_conditions(self, where: Union[str, list]) -> dict: self.set_error(f"Incorrect type of where in {inspect.stack()[0][3]} method") return result - result['sql'] = sql + result["sql"] = sql return result - def select(self, table: Union[str, dict], fields: Union[str, list, dict] = '*'): - if table == '' or table == {} or fields == '' or fields == [] or fields == {}: + def select(self, table: Union[str, dict], fields: Union[str, list, dict] = "*"): + if table == "" or table == {} or fields == "" or fields == [] or fields == {}: self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") return self self.reset() - if isinstance(fields, dict) or isinstance(fields, list) or isinstance(fields, str): + if ( + isinstance(fields, dict) + or isinstance(fields, list) + or isinstance(fields, str) + ): self._sql = f"SELECT {self._prepare_aliases(fields)}" else: - self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") + self.set_error( + f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary" + ) return self if isinstance(table, dict) or isinstance(table, str): self._sql += f" FROM {self._prepare_aliases(table)}" else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") + self.set_error( + f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary" + ) return self return self - def where(self, where: Union[str, list], addition: str = ''): - if where == '' or where == []: + def where(self, where: Union[str, list], addition: str = ""): + if where == "" or where == []: self.set_error(f"Empty where in {inspect.stack()[0][3]} method") return self conditions = self._prepare_conditions(where) - if addition != '': + if addition != "": self._sql += f" WHERE {conditions['sql']} {addition}" else: self._sql += f" WHERE {conditions['sql']}" - if isinstance(conditions['values'], list) and conditions['values'] != []: - self._params += tuple(conditions['values']) + if isinstance(conditions["values"], list) and conditions["values"] != []: + self._params += tuple(conditions["values"]) return self def having(self, having: Union[str, list]): - if having == '' or having == []: + if having == "" or having == []: self.set_error(f"Empty having in {inspect.stack()[0][3]} method") return self @@ -314,8 +362,8 @@ def having(self, having: Union[str, list]): self._sql += f" HAVING {conditions['sql']}" - if isinstance(conditions['values'], list) and conditions['values'] != []: - self._params += tuple(conditions['values']) + if isinstance(conditions["values"], list) and conditions["values"] != []: + self._params += tuple(conditions["values"]) return self @@ -324,7 +372,7 @@ def like(self, cond: Union[str, tuple, list] = ()): if isinstance(cond, str): self.where(cond) elif isinstance(cond, tuple) or isinstance(cond, list): - self.where([[cond[0], 'LIKE', cond[1]]]) + self.where([[cond[0], "LIKE", cond[1]]]) return self def not_like(self, cond: Union[str, tuple, list] = ()): @@ -332,7 +380,7 @@ def not_like(self, cond: Union[str, tuple, list] = ()): if isinstance(cond, str): self.where(cond) elif isinstance(cond, tuple) or isinstance(cond, list): - self.where([[cond[0], 'NOT LIKE', cond[1]]]) + self.where([[cond[0], "NOT LIKE", cond[1]]]) return self def limit(self, limit: int = 1): @@ -343,37 +391,37 @@ def offset(self, offset: int = 0): self._sql += f" OFFSET {offset}" return self - def _prepare_sorting(self, field: str = '', sort: str = '') -> tuple: - if field.find(' ') > -1: - splitted = field.split(' ') + def _prepare_sorting(self, field: str = "", sort: str = "") -> tuple: + if field.find(" ") > -1: + splitted = field.split(" ") field = splitted[0] sort = splitted[1] field = self._prepare_field(field) - sort = 'ASC' if sort == '' else sort.upper() + sort = "ASC" if sort == "" else sort.upper() return field, sort - def _prepare_field(self, field: str = '') -> str: - if field == '': + def _prepare_field(self, field: str = "") -> str: + if field == "": self.set_error(f"Empty field in {inspect.stack()[0][3]} method") - return '' + return "" - if field.find('(') > -1 or field.find(')') > -1 or field.find('*') > -1: - if field.find(' AS ') > -1: - field = field.replace(' AS ', ' AS `') + if field.find("(") > -1 or field.find(")") > -1 or field.find("*") > -1: + if field.find(" AS ") > -1: + field = field.replace(" AS ", " AS `") return f"{field}`" else: return f"{field}" else: - field = field.replace('.', '`.`') - field = field.replace(' AS ', '` AS `') + field = field.replace(".", "`.`") + field = field.replace(" AS ", "` AS `") return f"`{field}`" def _prepare_fieldlist(self, fields: Union[str, tuple, list] = ()) -> str: - result = '' - if fields == '' or fields == () or fields == []: + result = "" + if fields == "" or fields == () or fields == []: self.set_error(f"Empty fields in {inspect.stack()[0][3]} method") return result @@ -381,12 +429,12 @@ def _prepare_fieldlist(self, fields: Union[str, tuple, list] = ()) -> str: result = self._prepare_field(fields) elif isinstance(fields, tuple) or isinstance(fields, list): fields = [self._prepare_field(field) for field in fields] - result = ', '.join(fields) + result = ", ".join(fields) return result - def order_by(self, field: Union[str, tuple, list] = (), sort: str = ''): - if field == '' or field == () or field == []: + def order_by(self, field: Union[str, tuple, list] = (), sort: str = ""): + if field == "" or field == () or field == []: self.set_error(f"Empty field in {inspect.stack()[0][3]} method") return self @@ -402,12 +450,12 @@ def order_by(self, field: Union[str, tuple, list] = (), sort: str = ''): for item in field: new_item = self._prepare_sorting(item) new_list.append(f"{new_item[0]} {new_item[1]}") - self._sql += ' ORDER BY ' + ', '.join(new_list) + self._sql += " ORDER BY " + ", ".join(new_list) return self def group_by(self, field: Union[str, tuple, list] = ()): - if field == '' or field == () or field == []: + if field == "" or field == () or field == []: self.set_error(f"Empty field in {inspect.stack()[0][3]} method") return self @@ -416,14 +464,16 @@ def group_by(self, field: Union[str, tuple, list] = ()): return self def delete(self, table: Union[str, dict]): - if table == '' or table == {}: + if table == "" or table == {}: self.set_error(f"Empty table in {inspect.stack()[0][3]} method") return self if isinstance(table, dict) or isinstance(table, str): table = self._prepare_aliases(table) else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") + self.set_error( + f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary" + ) return self self.reset() @@ -432,28 +482,38 @@ def delete(self, table: Union[str, dict]): return self def insert(self, table: Union[str, dict], fields: Union[list, dict]): - if table == '' or table == {} or fields == [] or fields == {}: + if table == "" or table == {} or fields == [] or fields == {}: self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") return self if isinstance(table, dict) or isinstance(table, str): table = self._prepare_aliases(table) else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") + self.set_error( + f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary" + ) return self self.reset() if isinstance(fields, dict): - values = ("?," * len(fields)).rstrip(',') - self._sql = f"INSERT INTO {table} (" + self._prepare_fieldlist(list(fields.keys())) + f") VALUES ({values})" + values = ("?," * len(fields)).rstrip(",") + self._sql = ( + f"INSERT INTO {table} (" + + self._prepare_fieldlist(list(fields.keys())) + + f") VALUES ({values})" + ) self._params = tuple(fields.values()) elif isinstance(fields, list): names = fields.pop(0) - value = ("?," * len(names)).rstrip(',') + value = ("?," * len(names)).rstrip(",") v = f"({value})," - values = (v * len(fields)).rstrip(',') - self._sql = f"INSERT INTO {table} (" + self._prepare_fieldlist(names) + f") VALUES {values}" + values = (v * len(fields)).rstrip(",") + self._sql = ( + f"INSERT INTO {table} (" + + self._prepare_fieldlist(names) + + f") VALUES {values}" + ) params = [] for item in fields: if isinstance(item, list): @@ -461,29 +521,35 @@ def insert(self, table: Union[str, dict], fields: Union[list, dict]): params.append(subitem) self._params = tuple(params) else: - self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") + self.set_error( + f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary" + ) return self return self def update(self, table: Union[str, dict], fields: Union[list, dict]): - if table == '' or table == {} or fields == [] or fields == {}: + if table == "" or table == {} or fields == [] or fields == {}: self.set_error(f"Empty table or fields in {inspect.stack()[0][3]} method") return self if isinstance(table, dict) or isinstance(table, str): table = self._prepare_aliases(table) else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") + self.set_error( + f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary" + ) return self if isinstance(fields, list) or isinstance(fields, dict): - sets = '' + sets = "" for item in fields: sets += f" {self._prepare_field(item)} = ?," - sets = sets.rstrip(',') + sets = sets.rstrip(",") else: - self.set_error(f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary") + self.set_error( + f"Incorrect type of fields in {inspect.stack()[0][3]} method. Fields must be String, List or Dictionary" + ) return self self.reset() @@ -493,20 +559,29 @@ def update(self, table: Union[str, dict], fields: Union[list, dict]): return self - def join(self, table: Union[str, dict] = '', on: Union[str, tuple, list] = (), join_type: str = 'INNER'): + def join( + self, + table: Union[str, dict] = "", + on: Union[str, tuple, list] = (), + join_type: str = "INNER", + ): join_type = join_type.upper() - if join_type == '' or join_type not in self._JOIN_TYPES: - self.set_error(f"Empty join_type or is not allowed in {inspect.stack()[0][3]} method") + if join_type == "" or join_type not in self._JOIN_TYPES: + self.set_error( + f"Empty join_type or is not allowed in {inspect.stack()[0][3]} method" + ) return self - if table == '' or table == {}: + if table == "" or table == {}: self.set_error(f"Empty table in {inspect.stack()[0][3]} method") return self if isinstance(table, dict) or isinstance(table, str): self._sql += f" {join_type} JOIN {self._prepare_aliases(table)}" else: - self.set_error(f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary") + self.set_error( + f"Incorrect type of table in {inspect.stack()[0][3]} method. Table must be String or Dictionary" + ) return self if on: @@ -517,7 +592,9 @@ def join(self, table: Union[str, dict] = '', on: Union[str, tuple, list] = (), j elif isinstance(on, str): self._sql += f" ON {on}" else: - self.set_error(f"Incorrect type of on in {inspect.stack()[0][3]} method. On must be String, Tuple or List") + self.set_error( + f"Incorrect type of on in {inspect.stack()[0][3]} method. On must be String, Tuple or List" + ) return self self.set_error() @@ -525,11 +602,11 @@ def join(self, table: Union[str, dict] = '', on: Union[str, tuple, list] = (), j return self def drop(self, table: str, add_exists: bool = True): - if table == '': + if table == "": self.set_error(f"Empty table in {inspect.stack()[0][3]} method") return self - exists = 'IF EXISTS ' if add_exists else '' + exists = "IF EXISTS " if add_exists else "" self.reset() self._sql = f"DROP TABLE {exists}`{table}`" @@ -537,7 +614,7 @@ def drop(self, table: str, add_exists: bool = True): return self def truncate(self, table: str): - if table == '': + if table == "": self.set_error(f"Empty table in {inspect.stack()[0][3]} method") return self