In [None]:
if runWithoutInstalling := False:
    import sys

    sys.path.append("src")


import os
import random
import pickle
from datetime import datetime
from dotenv import load_dotenv

from asyncblobdict import AsyncBlobStore, CacheMode, ConcurrencyMode
from asyncblobdict.azure_blob_adapter import AzureBlobAdapter
from asyncblobdict.local_file_adapter import LocalFileAdapter

load_dotenv()

# Config
AZURE_CONN_STR = os.getenv("AZURE_CONN_STR")
AZURE_CONTAINER = os.getenv("AZURE_CONTAINER")
print("Azure Container:", AZURE_CONTAINER)
LOCAL_BASE_PATH = "./local_blob_storage"
LOCAL_CONTAINER = "test_container"

# create local base path if not exists
os.makedirs(LOCAL_BASE_PATH, exist_ok=True)


# Sync behavior
BEHAVIOR = "overwrite"  # Options: "skip", "overwrite", "raise"


async def run_demo(backend: str):
    """
    Run AsyncBlobStore demo for the given backend ("azure" or "local").
    """
    if backend == "azure":
        assert AZURE_CONN_STR, "AZURE_CONN_STR not set in environment"
        assert AZURE_CONTAINER, "AZURE_CONTAINER not set in environment"
        adapter = AzureBlobAdapter.from_connection_string(AZURE_CONN_STR)
        container_name = AZURE_CONTAINER
        label = "[Azure]"
    elif backend == "local":
        adapter = LocalFileAdapter(LOCAL_BASE_PATH)
        container_name = LOCAL_CONTAINER
        label = "[Local]"
    else:
        raise ValueError("Backend must be 'azure' or 'local'")

    store = AsyncBlobStore(
        adapter,
        container_name,
        cache_mode=CacheMode.WRITE_THROUGH,
        concurrency_mode=ConcurrencyMode.ETAG,
    )

    async with store:
        # Store JSON config
        config_data = {
            "learning_rate": 0.01,
            "layers": [64, 128, 256],
            "activation": "relu",
            "created_at": datetime.utcnow(),
        }
        await store.set_json("demo/ml_config", config_data)
        print(f"{label} Stored config JSON.")

        loaded_config = await store.get_json("demo/ml_config")
        print(f"{label} Loaded config:", loaded_config)

        # Store binary model
        dummy_model = {
            "weights": [0.1, 0.2, 0.3],
            "bias": 0.5,
            "random_seed": random.randint(0, 1000),
        }
        model_bytes = pickle.dumps(dummy_model)
        await store.set_binary("demo/model_v1.pkl", model_bytes)
        print(f"{label} Stored model binary.")

        loaded_model_bytes = await store.get_binary("demo/model_v1.pkl")
        loaded_model = pickle.loads(loaded_model_bytes)
        print(f"{label} Loaded model:", loaded_model)

        # Test concurrency conflict
        try:
            # Manually get the blob name and etag
            blob_name = store._format._blob_name_json("demo/ml_config")
            old_etag = store._cache[blob_name].etag
            assert old_etag is not None, "ETag should not be None for conflict test"

            # Simulate external modification
            await store._core.set_bytes(
                blob_name,
                store._format.json_serializer.serialize({"external": "change"}),
                etag=None,  # Bypass ETag check
            )

            # Now try to set with stale ETag
            await store._core.set_bytes(
                blob_name,
                store._format.json_serializer.serialize({"new": "data"}),
                etag=old_etag,
            )
        except Exception as e:
            print(f"{label} Concurrency error caught:", e)

        # List keys
        keys = await store.list_keys()
        print(f"{label} Keys in store:", keys)

        # Delete a key
        await store.delete("ml_config")
        print(f"{label} Deleted ml_config.")

        # Test WRITE_BACK + sync
        store._core.cache_mode = CacheMode.WRITE_BACK
        await store.set_json(
            "demo/batched_config",
            {"batch": [random.randint(0, 100) for _ in range(10)]},
        )
        await store.set_binary(
            "demo/batched_model.pkl", bytes(random.getrandbits(8) for _ in range(1024))
        )
        print(f"{label} Changes cached, now syncing...")
        await store.sync(etag_behavior=BEHAVIOR)
        print(f"{label} Sync complete.")


# ---------------------------
# Run both backends
# ---------------------------
print("=== Azure Backend Demo ===")
await run_demo("azure")

print("\n=== Local Backend Demo ===")
await run_demo("local")

In [None]:
!pytest -s