In [17]:
import os, random
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.mgmt.resource import ResourceManagementClient
from azure.mgmt.storage import StorageManagementClient
from azure.mgmt.storage.models import BlobContainer

In [18]:
load_dotenv()
credential = DefaultAzureCredential()
subscription_id = os.environ["AZURE_SUBSCRIPTION_ID"]
RESOURCE_GROUP_NAME = os.environ["AZURE_RESOURCE_GROUP_NAME"]
LOCATION = os.environ["LOCATION"]
STORAGE_ACCOUNT_NAME = os.environ["AZURE_STORAGE_ACCOUNT_NAME"]
CONTAINER_NAME_ONE = os.environ["CONTAINER_NAME_ONE"]
CONTAINER_NAME_TWO = os.environ["CONTAINER_NAME_TWO"]
CONTAINER_NAME_THREE = os.environ["CONTAINER_NAME_THREE"]

In [19]:
storage_client = StorageManagementClient(credential, subscription_id)
available_storage_accounts = storage_client.storage_accounts.check_name_availability(
    {"name": STORAGE_ACCOUNT_NAME}
)

if available_storage_accounts.name_available:
    print(f"Creating storage account: {STORAGE_ACCOUNT_NAME}")
    storage_async_operation = storage_client.storage_accounts.begin_create(
        RESOURCE_GROUP_NAME,
        STORAGE_ACCOUNT_NAME,
        {
            "location": LOCATION,
            "sku": {"name": "Standard_LRS"},
            "kind": "StorageV2",
            "enable_https_traffic_only": True,
        },
    )
    storage_account = storage_async_operation.result()
    print(f"Storage account {STORAGE_ACCOUNT_NAME} created.")
else:
    print(f"Storage account name {STORAGE_ACCOUNT_NAME} is not available. Please choose a different name.")

Creating storage account: etlpipelinestorageacct
Storage account etlpipelinestorageacct created.


In [20]:
container_01 = storage_client.blob_containers.create(
    RESOURCE_GROUP_NAME,
    STORAGE_ACCOUNT_NAME,
    CONTAINER_NAME_ONE,
    BlobContainer()
)

print(f"Blob container {CONTAINER_NAME_ONE} created in storage account {STORAGE_ACCOUNT_NAME}.")

Blob container bronze-layer created in storage account etlpipelinestorageacct.
