SQLiteDatabase
wzelazo edited this page Jul 6, 2018
·
6 revisions
-
database name
- default 'taurus.db'
get_sql_from_file function creates sql files based on python script
def get_sql_from_file(self,script_name):
return pkg_reorigins.reorigin_stream(__name__,'SQL/'+script_name+'.sql').read().decode('ascii')
def build_sql(self,script_string,args={}):
return script_string.format(**args)
def commit(self):
self.db_connection.commit()
one function executes sql script of choice updating it with given values, returning one record
def one(self,script_name,args={}):
cursor=self.do(script_name,args)
if cursor:
return cursor.fetchone()
def transaction(self,script_name,data):
sql_string=self.get_sql_form_file(script_name)
assert hasattr(data,'__iter__')
self.db_connection.executemany(sql_string,data)
self.commit()
do function executes sql script of choice, updating it with using given values and arguments
def do(self,script_name,args={}):
'''
script contains ; is script and executed without output
script without ; is for fetching output
:param script_name:
:param args:
:return:
'''
sql_string=self.get_sql_form_file(script_name)
if ';' not in sql_string:
c=self.db_connection.execute(sql_string,args)
self.commit()
return c
elif ':' in sql_string:
query_list=sql_string.split(';')
for query in query_list:
self.db_connection.execute(query,args)
self.commit()
else:
self.db_connection.executescript(sql_string)
self.commit()
def table_exists(self,dataset_name):
c=self.db_connection.cursor()
return not c.execute(
"SELECT count(*) FROM sqlite_master WHERE type='table' AND name=?",
[dataset_name]).fetchone()[0]==0