In [1]:
import os
import json
import time
import shelve
import requests
from dotenv import load_dotenv

from SpecialPrint import printSp, Style, Color

In [2]:
load_dotenv()

True

In [12]:
def fetchData():
    response = requests.get(os.getenv("USAGE_URL"))
    if response.status_code == 200:
        return response.json()
    return None

In [14]:
def updateJsonData():
    apiData = fetchData()

    with open('usage.json', 'r') as file:
        oldData = json.load(file)

    oldDataIds = set(item['created_at'] for item in oldData)
    newData = [item for item in apiData if item['created_at'] not in oldDataIds]
    
    if newData or len(apiData) != len(oldData):
        updatedData = oldData + newData
        with open('usage.json', 'w') as file:
            json.dump(updatedData, file)
        print(f"Added {len(newData)} new {"entry" if len(newData) == 1 else "entries"}.")
        return updatedData
    else:
        print("No new data entries detected.")
        return oldData

# Example usage
updateJsonData()

Added 40 new entries.


[{'created_at': '2024-02-23T03:30:46.894119+00:00',
  'team_id': 1244352,
  'method': 'POST',
  'response_status': 200,
  'path': '/timegpt_multi_series',
  'input_tokens': 144,
  'output_tokens': 36,
  'finetune_tokens': 0,
  'price_of_request_in_cents': 29},
 {'created_at': '2024-02-23T03:32:13.978263+00:00',
  'team_id': 1244352,
  'method': 'POST',
  'response_status': 200,
  'path': '/timegpt_multi_series',
  'input_tokens': 144,
  'output_tokens': 6,
  'finetune_tokens': 0,
  'price_of_request_in_cents': 19},
 {'created_at': '2024-02-23T03:33:22.927095+00:00',
  'team_id': 1244352,
  'method': 'POST',
  'response_status': 200,
  'path': '/timegpt_multi_series',
  'input_tokens': 144,
  'output_tokens': 36,
  'finetune_tokens': 0,
  'price_of_request_in_cents': 29},
 {'created_at': '2024-03-06T21:06:31.26001+00:00',
  'team_id': 1244352,
  'method': 'POST',
  'response_status': 500,
  'path': '/timegpt_multi_series_anomalies',
  'input_tokens': None,
  'output_tokens': None,
  'fi

In [16]:
def findTotals():
    with open("usage.json", "r") as file:
        data = json.load(file)

    calls = len(data)
    tokens = sum(
        (item["input_tokens"] if item["input_tokens"] is not None else 0)
        + (item["output_tokens"] if item["output_tokens"] is not None else 0)
        for item in data
    )
    spent = sum(item["price_of_request_in_cents"] for item in data) / 100

    return calls, tokens, spent

findTotals()

(87, 287135, 500.33)

In [19]:
# create a function that will update shelf data

def updateShelfData():
    totalsData = findTotals()
    shelfData = {
        "calls": {"curr": 0, "diff": 0},
        "tokens": {"curr": 0, "diff": 0},
        "spent": {"curr": 0, "diff": 0},
    }

    with shelve.open("usage") as shelf:
        for key in ["calls", "tokens", "spent"]:
            if key not in shelf:
                shelf[key] = {"curr": 0, "diff": 0}
            else:
                currVal = shelf[key]["curr"]
                diffVal = totalsData[key] - currVal
                shelf[key] = {"curr": totalsData[key], "diff": diffVal}

            shelf[key]["curr"] = shelf[key]["curr"]
            shelf[key]["diff"] = shelf[key]["diff"]

        shelfData["calls"] = shelf["calls"]
        shelfData["tokens"] = shelf["tokens"]
        shelfData["spent"] = shelf["spent"]
    
    return shelfData

updateShelfData()

TypeError: tuple indices must be integers or slices, not str

In [5]:
def updateLocalData(filepath, newData=None):
    newDataPresent = newData is not None and len(newData) > 0
    usageData = {
        "calls": {"curr": 0, "diff": 0},
        "tokens": {"curr": 0, "diff": 0},
        "spent": {"curr": 0, "diff": 0},
    }

    if newDataPresent:
        try:
            with open(filepath, "r") as file:
                localData = json.load(file)
        except (FileNotFoundError, json.JSONDecodeError):
            localData = []

        localDataCreatedAt = {item["created_at"]: item for item in localData}

        for item in newData:
            localDataCreatedAt[item["created_at"]] = item

        with open(filepath, "w") as file:
            json.dump(list(localDataCreatedAt.values()), file, indent=4)

    with shelve.open("usage") as shelf:
        for key in ["calls", "tokens", "spent"]:
            if key not in shelf:
                shelf[key] = {"curr": 0, "diff": 0}

            if newDataPresent and key in newData:
                currVal = shelf[key]["curr"]
                diffVal = newData[key] - currVal
                shelf[key] = {"curr": newData[key], "diff": diffVal}

            usageData[key]["curr"] = shelf[key]["curr"]
            usageData[key]["diff"] = shelf[key]["diff"]

    shouldUpdate = newDataPresent or any(
        value["curr"] != 0 or value["diff"] != 0 for value in usageData.values()
    )

    return usageData, shouldUpdate

In [4]:
def printAndClear(message, sleepTime=1, newLine=False):
    if not newLine:
        print(f"\r{message}", end="", flush=True)
        time.sleep(sleepTime)
    else:
        print(f"\r{message}\n\n", end="")

In [7]:
def readData():
  with open("usage.json", "r") as file:
    data = json.load(file)
    calls = len(data)
    tokens = sum(
      (item["input_tokens"] if item["input_tokens"] is not None else 0)
      + (item["output_tokens"] if item["output_tokens"] is not None else 0)
      for item in data
    )
    spent = sum(item["price_of_request_in_cents"] for item in data) / 100
    return calls, tokens, spent

In [6]:
def refreshUsageData():
  try:
    print("\033[?25l", end="")
    startTime = time.time()
    tryCount = 1
    checkCount = 4
    while time.time() - startTime < 20:
      # newData = fetchData()
      # if newData is not None:
      #   try:
      #     with open("./usage.json", "r") as file:
      #       localData = json.load(file)
      #   except (FileNotFoundError, json.JSONDecodeError):
      #     localData = []

      #   if len(newData) > len(localData):
      #     usageData, shouldUpdate = updateLocalData("./usage.json", newData)
      #     if shouldUpdate:
      #       printAndClear("Token usage successfully updated with new data.", newLine=True)
      #       showTokenUsage(usageData)
      #       return
      #     else:
      #       printAndClear(
      #         "No new usage data detected. Token usage remains the same.", newLine=True
      #       )
      #       showTokenUsage(usageData)
      #     else:
      #       printAndClear(
      #         f"Checking for new data || Attempt {tryCount} of {checkCount} .        "
      #       )
      #       printAndClear(
      #         f"Checking for new data || Attempt {tryCount} of {checkCount} . .      "
      #       )
      #       printAndClear(
      #         f"Checking for new data || Attempt {tryCount} of {checkCount} . . .    "
      #       )
      #       printAndClear(
      #         f"Checking for new data || Attempt {tryCount} of {checkCount} . . . .  "
      #       )
      #       printAndClear(
      #         f"Checking for new data || Attempt {tryCount} of {checkCount} . . . . ."
      #       )
      #       tryCount += 1
      # else:
      #   printAndClear("Failed to fetch new data. Retrying...", 5)
      #   checkCount -= 1


  finally:
    print("\033[?25h", end="")

In [8]:
def borderPrint(text):
  padding = 10
  width = padding * 2 + len(text)

  border = printSp(f"{'= ' * int(width / 4)}", color=Color.BRIGHT_WHITE)

  print(border)
  print("\n" + text.center(width) + "\n")
  print(border)

In [9]:
def displayUsageData(data):
  printStr = ""
  for key in data:
    printStr += f"{key.capitalize()}: {data[key]['diff']}"
  return printStr

In [10]:
def showTokenUsage(data):
  calls, tokens, spent = readData()

  callsTxt = printSp("API CALLS:  ", color=Color.BRIGHT_WHITE)
  tokensTxt = printSp("TOKENS:  ", color=Color.BRIGHT_WHITE)
  spentTxt = printSp("SPENT:  ", color=Color.BRIGHT_WHITE)
  callsVal = printSp(f"{calls}", style=Style.BOLD, color=Color.BRIGHT_RED)
  tokensVal = printSp(f"{tokens}", style=Style.BOLD, color=Color.BRIGHT_RED)
  spentVal = printSp(f"{spent:.2f}", style=Style.BOLD, color=Color.BRIGHT_RED)

  divider = printSp("   ||   ", style=Style.BOLD, color=Color.BRIGHT_WHITE)

  usageString1 = f"{callsTxt}{callsVal}{divider}{tokensTxt}{tokensVal}{divider}{spentTxt}{spentVal}"
  usageString2 = displayUsageData(data)
  
  printStr = f"{usageString1}\n{usageString2}"

  borderPrint(printStr)

In [11]:
refreshUsageData()

No new usage data detected. Token usage remains the same.

[?25h

UnboundLocalError: cannot access local variable 'usageData' where it is not associated with a value