In [13]:
import os
import hashlib
import logging
import gc
from datetime import datetime
from dotenv import load_dotenv
from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient
from azure.core.exceptions import ResourceExistsError
import tracemalloc
import sys
from concurrent_log_handler import ConcurrentRotatingFileHandler  # Updated log handler

In [14]:
# Load environment variables
load_dotenv()

# Azure configuration
AZURE_ACCOUNT_NAME = os.getenv('AZURE_STORAGE_ACCOUNT_NAME')
AZURE_ACCOUNT_KEY = os.getenv('AZURE_STORAGE_ACCOUNT_KEY')
AZURE_CONTAINER_NAME = os.getenv('AZURE_CONTAINER_NAME')
LOCAL_FOLDER = os.getenv('LOCAL_FOLDER_PATH')

In [15]:
# Blob service client
blob_service_client = BlobServiceClient(
    account_url=f"https://{AZURE_ACCOUNT_NAME}.blob.core.windows.net",
    credential=AZURE_ACCOUNT_KEY
)

In [16]:
# Create or get container
container_client = blob_service_client.get_container_client(AZURE_CONTAINER_NAME)
try:
    container_client.create_container()
except ResourceExistsError:
    pass

In [17]:
# Start memory tracking
tracemalloc.start()

# Initialize logger with file handler
log_file = "upload_sync.log"
logger = logging.getLogger(__name__)
logger.setLevel(logging.WARNING)  # Set logging to WARNING to reduce verbosity

# Add file handler for logging
handler = logging.FileHandler(log_file)
formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
handler.setFormatter(formatter)
logger.addHandler(handler)

In [18]:
# Function to get last modified time of file
def get_file_last_modified(file_path):
    return datetime.fromtimestamp(os.path.getmtime(file_path))


In [19]:
# Upload file in chunks to blob
def upload_blob_in_chunks(blob_client, file_path):
    with open(file_path, "rb") as data:
        blob_client.upload_blob(data, overwrite=True, max_concurrency=2)


In [20]:
# Process a batch of files
def process_batch(batch, blobs_metadata):
    newly_added_count = 0
    overwritten_count = 0

    for file_path, blob_name in batch:
        try:
            if blob_name in blobs_metadata:
                blob_last_modified = blobs_metadata[blob_name].timestamp()
                local_last_modified = get_file_last_modified(file_path).timestamp()

                if local_last_modified > blob_last_modified:
                    upload_blob_in_chunks(container_client.get_blob_client(blob_name), file_path)
                    overwritten_count += 1
            else:
                upload_blob_in_chunks(container_client.get_blob_client(blob_name), file_path)
                newly_added_count += 1

            # Clear file from memory
            file_path = None
            blob_name = None

        except Exception as e:
            logger.error(f"Error processing {file_path}: {e}")

    return newly_added_count, overwritten_count


In [21]:
# Generator to yield files in the local folder
def file_generator(root_folder):
    for root, dirs, files in os.walk(root_folder):
        for filename in files:
            file_path = os.path.join(root, filename)
            blob_name = os.path.relpath(file_path, root_folder).replace("\\", "/")
            yield file_path, blob_name

In [22]:
# Sync local files to Azure Blob and report
def sync_local_to_blob_and_report(batch_size_in_bytes=1024 * 1024, max_documents=1000):
    logger.info("Starting sync with Azure Blob Storage...")

    # 1. Fetch the initial blob metadata and count
    blobs_metadata = {blob.name: blob.last_modified for blob in container_client.list_blobs()}
    total_blobs_in_container = len(blobs_metadata)  # Initial blob count
    logger.warning(f"Total blobs found in container: {total_blobs_in_container}")

    # 2. Get the total number of local files
    total_local_files = sum(1 for _ in file_generator(LOCAL_FOLDER))
    logger.warning(f"Total local files to sync: {total_local_files}")

    total_newly_added_count = 0
    total_overwritten_count = 0
    current_batch = []
    current_batch_size = 0
    total_uploaded = 0

    # Process files in batches
    for file_path, blob_name in file_generator(LOCAL_FOLDER):
        file_size = os.path.getsize(file_path)
        current_batch.append((file_path, blob_name))
        current_batch_size += file_size

        # Process batch when size exceeds limit
        if current_batch_size >= batch_size_in_bytes:
            newly_added, overwritten = process_batch(current_batch, blobs_metadata)
            total_newly_added_count += newly_added
            total_overwritten_count += overwritten
            total_uploaded += len(current_batch)

            # Reset batch
            current_batch.clear()
            current_batch_size = 0

            # Free memory after each batch
            gc.collect()

        # Restart if max documents reached
        if total_uploaded >= max_documents:
            total_uploaded = 0
            blobs_metadata = {blob.name: blob.last_modified for blob in container_client.list_blobs()}
            gc.collect()

    # Process remaining files in the final batch
    if current_batch:
        newly_added, overwritten = process_batch(current_batch, blobs_metadata)
        total_newly_added_count += newly_added
        total_overwritten_count += overwritten
        current_batch.clear()
        gc.collect()

    # 3. Fetch the final blob count in the container
    final_blob_count = len(list(container_client.list_blobs()))  # Final blob count

    # 4. Log the summary
    logger.warning("----- Sync Summary -----")
    logger.warning(f"Initial number of blobs: {total_blobs_in_container}")
    logger.warning(f"Total local files: {total_local_files}")
    logger.warning(f"Total newly added files: {total_newly_added_count}")
    logger.warning(f"Total overwritten files: {total_overwritten_count}")
    logger.warning(f"Final number of blobs in storage: {final_blob_count}")
    logger.warning("------------------------")

    logger.info("Sync process complete.")
    return

# Run the sync process
if __name__ == "__main__":
    sync_local_to_blob_and_report(batch_size_in_bytes=1024 * 1024)