In [0]:
from azure.storage.blob import BlobServiceClient
from azure.data.tables import TableServiceClient
from azure.core.credentials import AzureNamedKeyCredential
import logging
import os
import uuid
from datetime import datetime

# Configure logging
log_dir = "/dbfs/tmp/logs"  # Separate directory for logs
os.makedirs(log_dir, exist_ok=True)
log_file = os.path.join(log_dir, f"myapp_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log")

logging.basicConfig(
    filename=log_file,
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

# Function to save image data to a file
def save_image_data_to_file(image_data, file_path):
    try:
        with open(file_path, "wb") as f:
            f.write(image_data)
        logger.info(f"File saved successfully: {file_path}")
    except Exception as e:
        logger.error(f"Failed to save file {file_path}: {e}")
        raise

# Function to check if a file is an image
def is_image_file(file_name):
    image_extensions = [".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp"]
    if not any(file_name.lower().endswith(ext) for ext in image_extensions):
        logger.error(f"File is not an image: {file_name}")
        dbutils.notebook.exit(f"File is not an image: {file_name}")
    return True

# Function to check if a file has already been processed
def is_file_processed(file_name, table_client):
    try:
        entity = table_client.get_entity(partition_key="processed_files", row_key=file_name)
        return True
    except Exception as e:
        return False

# Function to mark a file as processed
def mark_file_as_processed(file_name, table_client):
    entity = {
        "PartitionKey": "processed_files",
        "RowKey": file_name,
        "ProcessedTime": datetime.utcnow().isoformat()
    }
    table_client.upsert_entity(entity)
    logger.info(f"File marked as processed: {file_name}")

# Notebook parameters
dbutils.widgets.text("containerName", "")
CONTAINER_NAME = dbutils.widgets.get("containerName")

# Validate container name
if not CONTAINER_NAME or not CONTAINER_NAME.isalnum():
    logger.error("Parameter 'containerName' is empty or contains invalid characters!")
    raise ValueError("Parameter 'containerName' is empty or contains invalid characters!")

# Azure Blob Storage configuration
STORAGE_ACCOUNT_NAME = dbutils.secrets.get(scope="dear-keyvault-scope", key="dear-storage-account")
STORAGE_ACCOUNT_KEY = dbutils.secrets.get(scope="my-keyvault-scope", key="storage-account-key")
INPUT_FOLDER = "input/"  # Folder where new files are uploaded

# Azure Table Storage configuration (for tracking processed files)
TABLE_STORAGE_ACCOUNT_NAME = dbutils.secrets.get(scope="dear-keyvault-scope", key="dear-tablestorage-account")
TABLE_STORAGE_ACCOUNT_KEY = dbutils.secrets.get(scope="my-keyvault-scope", key="table-storage-account-key")
TABLE_NAME = "ProcessedFiles"

# Initialize Blob Service Client
blob_service_client = BlobServiceClient(
    account_url=f"https://{STORAGE_ACCOUNT_NAME}.blob.core.windows.net",
    credential=STORAGE_ACCOUNT_KEY
)

# Initialize Table Service Client with AzureNamedKeyCredential
table_credential = AzureNamedKeyCredential(
    name=TABLE_STORAGE_ACCOUNT_NAME,
    key=TABLE_STORAGE_ACCOUNT_KEY
)
table_service_client = TableServiceClient(
    endpoint=f"https://{TABLE_STORAGE_ACCOUNT_NAME}.table.core.windows.net",
    credential=table_credential
)
table_client = table_service_client.get_table_client(TABLE_NAME)

# Get list of blobs in the 'input/' folder
container_client = blob_service_client.get_container_client(CONTAINER_NAME)
blobs = [blob for blob in container_client.list_blobs() if blob.name.startswith(INPUT_FOLDER)]

# Filter only image files
image_blobs = [blob for blob in blobs if is_image_file(blob.name)]

if not image_blobs:
    logger.error(f"No image files found in folder {INPUT_FOLDER} of container {CONTAINER_NAME}")
    raise Exception(f"No image files found in folder {INPUT_FOLDER} of container {CONTAINER_NAME}")

# Process each image file
for blob in image_blobs:
    file_name = blob.name.split('/')[-1]
    try:
        # Check if the file has already been processed
        if is_file_processed(file_name, table_client):
            logger.info(f"File already processed: {file_name}")
            continue  # Skip to the next file

        # Generate a unique file path for each image
        unique_id = str(uuid.uuid4())
        image_data_path = f"/dbfs/tmp/images/{unique_id}_{file_name}"  # Separate directory for images

        # Ensure the directory exists
        os.makedirs(os.path.dirname(image_data_path), exist_ok=True)

        # Download the image file
        blob_client = blob_service_client.get_blob_client(container=CONTAINER_NAME, blob=blob.name)
        blob_data = blob_client.download_blob().readall()

        # Save image data to a file in DBFS
        save_image_data_to_file(blob_data, image_data_path)

        # Mark the file as processed
        mark_file_as_processed(file_name, table_client)

        # Log the successful download
        logger.info(f"Successfully downloaded and saved file: {file_name} to {image_data_path}")

        print(f"Successfully downloaded and saved file: {file_name} to {image_data_path}")

        # Return the file path of the downloaded image (for downstream processing)
        dbutils.notebook.exit(image_data_path)

    except Exception as e:
        logger.error(f"Failed to process file {file_name}: {e}")
        continue  # Skip to the next file if an error occurs