In [1]:
import os
from datetime import datetime, timedelta
import random
import pandas as pd
import sqlite3
import shelve

In [2]:
def generateData(num=10):
    data = []
    for _ in range(num):
        timestamp = datetime.now() - timedelta(days=random.randint(1, 7))
        calls = 1
        tokens = random.randint(1000, 10000)
        spent = round(random.uniform(1.37, 5.62), 2)
        data.append({'timestamp': (timestamp, "DATETIME"), 'calls': (calls, "INTEGER"), 'tokens': (tokens, "INTEGER"), 'spent': (spent, "REAL")})

    return data

In [3]:
def dbAction(
    data=None,
    table="tokenUse",
    dbDelete=False,
    tableDrop=False,
    tableClear=False,
):
    # Function to register the adapter to convert datetime objects to ISO format
    def adapt_datetime_iso(val):
        return val.isoformat()

    # Function to connect to the SQLite database and create a cursor
    def dbConnect():
        connection = sqlite3.connect("data.db")
        cursor = connection.cursor()
        return connection, cursor

    # Function to close the cursor and disconnect from the SQLite database
    def dbClose():
        c.close()
        con.close()

    # Delete database file if specified
    if dbDelete:
        try:
            os.remove("data.db")
            print(f"Database `data.db` deleted.")
        except FileNotFoundError:
            print(f"ERROR: Database not found.")
        except Exception as e:
            print(f"An error occurred: {e}")
        return

    # Register the adapter
    sqlite3.register_adapter(datetime, adapt_datetime_iso)

    # Connect to the database
    con, c = dbConnect()

    def queryToDf():
        df = pd.read_sql_query(f"SELECT * FROM {table}", con)
        return df

    try:
        # Drop or clear the table if specified
        if tableDrop or tableClear:
            if tableDrop:
                c.execute(f"DROP TABLE IF EXISTS {table}")
                print(f"Database table `{table}` dropped.")
            elif tableClear:
                c.execute(f"DELETE FROM {table}")
                print(f"Database table `{table}` cleared.")
            con.commit()
            dbClose()
            return

        # If no data provided, fetch and print all existing data from the table or notify if no data exists
        if not data:
            # c.execute(f"SELECT * FROM {table}")
            # if c.fetchall():
            #     print(f"No new data detected. Table `{table}`:\n", c.fetchall())
            # else:
            #     print(f"No new data detected. Table `{table}` has no existing data.")
            existingTable = queryToDf()
            if existingTable.empty:
                print(f"No new data detected. Table `{table}` has no existing data.")
            else:
                print(f"No new data detected. Table `{table}`:\n")
            dbClose()
            return existingTable

        # Variable to store the column definitions
        colDefs = ", ".join(f"{k} {v[1]}" for k, v in data[0].items())
        # Query to create the table if it does not exist with the column definitions
        c.execute(f"CREATE TABLE IF NOT EXISTS {table} ({colDefs})")

        # Variable to store the column keys
        colKeys = ", ".join(data[0].keys())
        # Variable to store the data values
        dataVals = [[item[k][0] for k in item] for item in data]
        # Query to insert new data into the table using the column keys and data values
        c.executemany(
            f"INSERT INTO {table} ({colKeys}) VALUES ({', '.join('?' * len(data[0]))})",
            dataVals,
        )

        # Query to fetch and print all data from the table
        # c.execute(f"SELECT * FROM {table}")
        # print(f"New data successfully added. Table {table}:\n", c.fetchall())
        updatedTable = queryToDf()
        print(f"New data successfully added. Table {table}:\n")

        # Commit the changes and disconnect from the database
        con.commit()
        dbClose()
        return updatedTable

    # Handle any exceptions that occur
    except sqlite3.Error as e:
        print(f"An error occurred: {e}")
        con.rollback()
        dbClose()
        return

In [4]:
def dbToDf(table):
    con = sqlite3.connect("data.db")
    df = pd.read_sql_query(f"SELECT * FROM {table}", con)
    con.close()
    return df

In [5]:
def calcUsage():
    df = dbToDf("tokenUse")
    calls = df["calls"].sum()
    tokens = df["tokens"].sum()
    spent = df["spent"].sum()
    return {'calls': calls, 'tokens': tokens, 'spent': spent}

In [22]:
def shelfUsage(newData=None, shelfDelete=False):

    if shelfDelete:
        try:
            for file in os.listdir("."):
                if file.startswith("dataShelf"):
                    os.remove(file)
            print(f"All `dataShelf` files deleted.")
        except FileNotFoundError:
            print(f"ERROR: Shelf not found.")
        except Exception as e:
            print(f"An error occurred: {e}")
        return

    with shelve.open("dataShelf", writeback=True) as shelf:
        usage = []
        if not newData and not shelf:
            shelf["calls"] = {"curr": 0, "diff": 0}
            shelf["tokens"] = {"curr": 0, "diff": 0}
            shelf["spent"] = {"curr": 0, "diff": 0}
        else:
            for key in ["calls", "tokens", "spent"]:
                # Calculate the difference
                currVal = shelf[key]["curr"]
                diff = newData[key] - currVal

                # Update the shelf with new value and difference
                shelf[key]["curr"] = newData[key]
                shelf[key]["diff"] = diff

                if key == "spent":
                    usage.append(
                        {
                            key: {
                                "curr": round(shelf[key]["curr"], 2),
                                "diff": round(shelf[key]["diff"], 2),
                            }
                        }
                    )
                else:
                    usage.append(
                        {key: {"curr": shelf[key]["curr"], "diff": shelf[key]["diff"]}}
                    )

        if len(usage) == 0:
            return [
                {"calls": {"curr": 0, "diff": 0}},
                {"tokens": {"curr": 0, "diff": 0}},
                {"spent": {"curr": 0, "diff": 0}},
            ]
        return usage

In [7]:
dbData = generateData()
dbAction(data=dbData)

New data successfully added. Table tokenUse:



Unnamed: 0,timestamp,calls,tokens,spent
0,2024-03-06T13:25:53.625933,1,3907,5.44
1,2024-03-04T13:25:53.625933,1,2641,1.62
2,2024-03-07T13:25:53.625933,1,5402,3.35
3,2024-03-04T13:25:53.625933,1,1679,1.63
4,2024-03-06T13:25:53.625933,1,6940,1.77
5,2024-03-08T13:25:53.625933,1,1596,2.09
6,2024-03-10T13:25:53.625933,1,6193,3.54
7,2024-03-10T13:25:53.625933,1,4166,1.91
8,2024-03-06T13:25:53.625933,1,2188,2.3
9,2024-03-10T13:25:53.625933,1,6625,4.38
