# Database notebook
This notebook contains various implementations for database interactions.

### Imports + globals
In this section, modules are imported and global variables are defined. Execute on startup / after kernel reset only

In [1]:
from model.database import Database
from collections.abc import Iterable, Callable

from global_variables.importer import *

Project root was set to C:/Users/Max Moons/Documents/GitHub/OSRS-Trade-Ledger/py/
Setting up RealtimePricesSnapshot class (allow_rbpi_download=True)...
Setting up ItemWikiMapping...
global_variables importer setup time: 281.2ms


### Establishing db connections
Below various db connections are established. Read-only mode is highly recommended to avoid corrupting the database

In [2]:
# Timeseries database
db_ts = Database(gp.f_db_timeseries, read_only=True)

# Local database
db_local = Database(gp.f_db_local, read_only=True)

# Sandbox database (outdated, all tables, fewer rows per table)
db = Database(gp.f_db_sandbox, read_only=True)

def print_rows(rows: Iterable, n_rows: int = None, row_printer: Callable = print):
    for t in rows[:len(rows) if n_rows is None else n_rows]:
        row_printer(t)
    print(f'Total rows:{len(transactions)}')

In [4]:
def printer(t: dict):
    print({'item_name': go.id_name[t.get('item_id')], **t})

# transactions = db_ts.execute_select(table='realtime',t0=time.time()-86400*10, #print_result=True,
#                                           action='count', group_by='item_id', order_by='COUNT(*) DESC')

sql = """SELECT *, COUNT(*), AVG(price) FROM "realtime" WHERE timestamp>=?  GROUP BY item_id ORDER BY COUNT(*) DESC"""
transactions = db_ts.execute(sql, (int(time.time()-86400*10),)).fetchall()
print_rows(transactions, 10, row_printer=printer)

{'item_name': 'Death rune', 'item_id': 560, 'timestamp': 1716262942, 'is_buy': 0, 'price': 118, 'COUNT(*)': 20850, 'AVG(price)': 114.53448441247002}
{'item_name': 'Blood rune', 'item_id': 565, 'timestamp': 1716262940, 'is_buy': 0, 'price': 205, 'COUNT(*)': 20644, 'AVG(price)': 208.80740166634374}
{'item_name': 'Revenant ether', 'item_id': 21820, 'timestamp': 1716262960, 'is_buy': 1, 'price': 175, 'COUNT(*)': 20620, 'AVG(price)': 171.33118331716778}
{'item_name': 'Chaos rune', 'item_id': 562, 'timestamp': 1716262953, 'is_buy': 1, 'price': 59, 'COUNT(*)': 20402, 'AVG(price)': 61.977110087246345}
{'item_name': 'Coal', 'item_id': 453, 'timestamp': 1716262956, 'is_buy': 1, 'price': 142, 'COUNT(*)': 20233, 'AVG(price)': 142.62793456234863}
{'item_name': 'Law rune', 'item_id': 563, 'timestamp': 1716262954, 'is_buy': 1, 'price': 118, 'COUNT(*)': 19872, 'AVG(price)': 120.07130636070853}
{'item_name': 'Rune arrow', 'item_id': 892, 'timestamp': 1716262916, 'is_buy': 0, 'price': 77, 'COUNT(*)': 19

# Test method

In [2]:
def test_method(my_method, n_exe: int, **kwargs):

    tag = '' if kwargs.get('tag') is None else f"Method: {kwargs.get('tag')} "
    del kwargs['tag']
    t_start = time.time()
    for _ in range(n_exe):
        my_method(**kwargs)
    delta_t = time.time()-t_start

    if delta_t < 10:
        s = f'N: {n_exe} Time taken: {1000*(delta_t):.1f}ms'
    elif delta_t < 60:
        s = f'N: {n_exe} Time taken: {delta_t:.1f}s'
    elif delta_t < 3600:
        s = f'N: {n_exe} Time taken: {delta_t//60:0>2}:{delta_t%60:0>2}'
    else:
        s = f'N: {n_exe} Time taken: {delta_t//3600:0>2}:{delta_t%3600//60:0>2}:{delta_t%60:0>2}'
    print(f"{tag}{s}\n\tOutput={my_method(**kwargs)}\n")

# Current time getters
Basic time-related conversions like unix -> datetime.datetime and datetime.datetime -> unix

In [3]:
def loc_dtn() -> datetime.datetime:
    """ Return the local datetime.datetime.now() """
    return datetime.datetime.now()

def utc_dtn() -> datetime.datetime:
    """ Return the utc datetime.datetime.now() """
    return datetime.datetime.utcnow()


# datetime.datetime -> UNIX conversions + UNIX -> datetime.datetime conversions
Basic conversions between UNIX and datetime.datetime objects

In [4]:
def loc_unix_dt_1(unix) -> datetime.datetime:
    """ Convert the given UNIX timestamp to a local datetime.datetime """
    return datetime.datetime.fromtimestamp(unix)

def loc_unix_dt_2(unix) -> datetime.datetime:
    """ Convert the given UNIX timestamp to a local datetime.datetime """
    return loc_0 + datetime.timedelta(seconds=unix)

def loc_unix_dt_3(unix) -> datetime.datetime:
    return datetime.datetime.combine(date=datetime.date.fromtimestamp(unix), time=datetime.time(unix%86400//3600, unix%3600//60, unix%60))

loc_unix_dt = loc_unix_dt_1

n, unix_ = 1000000, int(time.time())
# test_method(loc_unix_dt_1, n_exe=n, tag='loc_unix_dt_1', unix=unix_)
# test_method(loc_unix_dt_2, n_exe=n, tag='loc_unix_dt_2', unix=unix_)
# test_method(loc_unix_dt_3, n_exe=n, tag='loc_unix_dt_3', unix=unix_)

In [5]:
def loc_dt_unix_1(dt: datetime.datetime):
    """ Convert a local datetime.datetime to a UNIX timestamp """
    return (dt-loc_0).total_seconds()

def loc_dt_unix_2(dt: datetime.datetime):
    """ Convert a local datetime.datetime to a UNIX timestamp """
    return time.mktime((dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, 0, 0, 1))

def loc_dt_unix_3(dt: datetime.datetime):
    """ Convert a local datetime.datetime to a UNIX timestamp """
    return time.mktime(dt.timetuple())

def loc_dt_unix_4(dt: datetime.datetime):
    """ Convert a local datetime.datetime to a UNIX timestamp """
    return dt.timestamp()

loc_dt_unix = loc_dt_unix_1

# n, dt_ = 1000000, loc_dtn()
# test_method(loc_dt_unix_1, n_exe=n, tag='loc_dt_unix_1', dt=dt_)
# test_method(loc_dt_unix_2, n_exe=n, tag='loc_dt_unix_2', dt=dt_)
# test_method(loc_dt_unix_3, n_exe=n, tag='loc_dt_unix_3', dt=dt_)
# test_method(loc_dt_unix_4, n_exe=n, tag='loc_dt_unix_4', dt=dt_)

In [6]:
def utc_unix_dt_1(unix) -> datetime.datetime:
    """ Convert the given UNIX timestamp to a local datetime.datetime """
    return datetime.datetime.utcfromtimestamp(unix)

def utc_unix_dt_2(unix) -> datetime.datetime:
    """ Convert the given UNIX timestamp to a UTC datetime.datetime """
    return utc_0 + datetime.timedelta(seconds=unix)

utc_unix_dt = utc_unix_dt_1

# n, unix_ = 1000000, time.time()
# test_method(utc_unix_dt_1, n_exe=n, tag='utc_unix_dt_1', unix=unix_)
# test_method(utc_unix_dt_2, n_exe=n, tag='utc_unix_dt_2', unix=unix_)

In [7]:
# Fastest
def utc_dt_unix_1(dt: datetime.datetime):
    """ Convert a UTC datetime.datetime to a UNIX timestamp """
    return dt.timestamp()+delta_s_loc_utc

def utc_dt_unix_2(dt: datetime.datetime):
    """ Convert a UTC datetime.datetime to a UNIX timestamp """
    return time.mktime((dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second+delta_s_loc_utc, 0, 0, -1))

def utc_dt_unix_3(dt: datetime.datetime):
    """ Convert a local datetime.datetime to a UNIX timestamp """
    return time.mktime((dt.year, dt.month, dt.day, dt.hour, dt.minute, dt.second, 0, 0, -1))+delta_s_loc_utc

def utc_dt_unix_4(dt: datetime.datetime):
    """ Convert a local datetime.datetime to a UNIX timestamp """
    return (dt-utc_0).total_seconds()

utc_dt_unix = utc_dt_unix_4

n, dt_utc_ = 1000000, utc_dtn()
test_method(utc_dt_unix_1, n_exe=n, tag='utc_dt_unix_1', dt=dt_utc_)
test_method(utc_dt_unix_2, n_exe=n, tag='utc_dt_unix_2', dt=dt_utc_)
test_method(utc_dt_unix_3, n_exe=n, tag='utc_dt_unix_3', dt=dt_utc_)
test_method(utc_dt_unix_4, n_exe=n, tag='utc_dt_unix_4', dt=dt_utc_)

Method: utc_dt_unix_1 N: 1000000 Time taken: 477.1ms
	Output=1714802618.337317

Method: utc_dt_unix_2 N: 1000000 Time taken: 657.7ms
	Output=1714802618.0

Method: utc_dt_unix_3 N: 1000000 Time taken: 680.8ms
	Output=1714802618.0

Method: utc_dt_unix_4 N: 1000000 Time taken: 338.6ms
	Output=1714802618.337318



In [8]:
n, unix_, dt_, dt_utc_ = 1000000, time.time(), datetime.datetime.now(), utc_dtn()

In [9]:
dtn = loc_dtn()
# dtn.fromtimestamp()

print(datetime.datetime.utcnow())

2024-05-04 06:03:40.520928


In [10]:
print(datetime.datetime.now(tz=tz_utc).timetuple())

time.struct_time(tm_year=2024, tm_mon=5, tm_mday=4, tm_hour=6, tm_min=3, tm_sec=40, tm_wday=5, tm_yday=125, tm_isdst=-1)
