In [1]:
import os
import json 
import logging 
import time 
import datetime 
from functools import wraps
from typing import Dict, Tuple, List, Optional
from pathlib import Path 

import nbformat
from nbclient import NotebookClient
from google.api_core.exceptions import NotFound
from google.cloud import storage 
from googleapiclient import discovery
import google.auth 


from dotenv import load_dotenv
load_dotenv()
CREDENTIALS, PROJECT_ID = google.auth.load_credentials_from_file(os.environ['GOOGLE_APPLICATION_CREDENTIALS'])
BUCKET_NAME = os.environ["NEXT_PUBLIC_STORAGE_BUCKET_NAME"]

class StorageClient: 

    def __init__(self) -> None:
        self.client = storage.Client(project=PROJECT_ID, credentials=CREDENTIALS)
        self.bucket = self.client.bucket(BUCKET_NAME)

    def get_blob(self, name: str, cur_dtime: datetime.datetime): 
        blob = self.bucket.blob(name)
        exists = True 
        try: 
            # ensures object metadata present, as blob doesn't load everything
            blob.reload()
            obj_dtime = blob.time_created
            age_seconds = (cur_dtime - obj_dtime).total_seconds()
        except NotFound as e: 
            exists = False 
            age_seconds = None 
        return blob, exists, age_seconds

sc = StorageClient()

In [24]:
"""
1. list all objects at chart prefix 
2. get the datetime of the newest blob. 
3. queue other blobs for delete 

"""

from pathlib import Path 
from typing import List 
from google.cloud.storage import Blob

schema_name = "inner" # testing 
schema_prefix = f"schemas/{schema_name}"

blobs: List[Blob] = [
    b for b in sc.client.list_blobs(sc.bucket, prefix=schema_prefix)
    if b.name.endswith(".json")
]
# Maps datetime objects (representing time blob was created at) to the blobs 
blob_created_at_map = {
    datetime.datetime.fromisoformat(Path(b.name).stem): b for b in blobs 
}
dt_max = next(reversed(sorted(blob_created_at_map.keys())))
blob_newest = blob_created_at_map[dt_max]
blobs_delete = [b for dt, b in blob_created_at_map.items() if dt != dt_max]
print(blob_newest)
print(blobs_delete)

# blob, exists, age = sc.get_blob("Fi*.json", datetime.datetime.now(datetime.timezone.utc))
# print(exists)

<Blob: beanstalk-analytics-test-bucket, schemas/inner/2022-09-09T20:50:37.574142+00:00.json, 1662756678401434>
[<Blob: beanstalk-analytics-test-bucket, schemas/inner/2022-09-09T20:50:33.143508+00:00.json, 1662756658778489>]


In [15]:
# datetime.datetime.now(datetime.timezone.utc).isoformat()

'2022-09-09T20:50:33.143508+00:00'

In [18]:
# datetime.datetime.now(datetime.timezone.utc).isoformat()