In [None]:
%pip install azure-monitor-query

In [2]:
import logging
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from azure.identity import ClientSecretCredential, DefaultAzureCredential
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from azure.storage.blob import BlobServiceClient
from azure.core.exceptions import ResourceExistsError
import time
import pandas as pd
import io
import re


# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("ExportPipeline")

# Azure auth (Service Principal)
credential = DefaultAzureCredential()

# Clients
workspace_id = "3d11f3ed-03e8-4ce4-97ab-85229f241e0a"
logs_client = LogsQueryClient(credential)
storage_account_name = "<storage_account_name>"
account_url = f"https://<storage_account_name>.blob.core.windows.net"
blob_service_client = BlobServiceClient(
    account_url, 
    credential=credential,
    retry_total=5, retry_connect=5, retry_read=5, retry_status=5
)

INFO:azure.identity._credentials.environment:No environment configuration found.
INFO:azure.identity._credentials.managed_identity:ManagedIdentityCredential will use Azure ML managed identity


In [3]:
def export_table(table_name, start_time, end_time, time_chunk):
    logger.info(f"Starting export for {table_name}")
    # Ensure container exists
    container_name = table_name.lower()
    container_name = re.sub(r'[^a-zA-Z0-9\s]', '', container_name)
    try:
        container_client = blob_service_client.create_container(container_name)
    except ResourceExistsError:
        container_client = blob_service_client.get_container_client(container_name)
    # Collect data in chunks
    current = start_time
    all_chunks = []
    while current < end_time:
        next_time = min(current + time_chunk, end_time)
        kql = f"{table_name} | where TimeGenerated between (startofday(datetime({current.isoformat()})) .. startofday(datetime({next_time.isoformat()})))"
        logger.info(f"Querying {table_name} from {current} to {next_time}")
        # Simple retry logic for query
        for attempt in range(3):
            try:
                resp = logs_client.query_workspace(workspace_id, query=kql, timespan=(current, next_time))
                break
            except Exception as err:
                logger.warning(f"Query attempt {attempt+1} failed: {err}")
                time.sleep(2 ** attempt)
        else:
            logger.error(f"All query attempts failed for range {current} - {next_time}; skipping")
            current = next_time
            continue
        # Handle response
        if resp.status == LogsQueryStatus.SUCCESS:
            tables = resp.tables
        else:
            logger.warning(f"Partial result for {table_name} at {current}: {resp.partial_error}")
            tables = resp.partial_data
        # Convert to DataFrame
        for table in tables:
            df_chunk = pd.DataFrame(data=table.rows, columns=table.columns)
            all_chunks.append(df_chunk)
        current = next_time
    if not all_chunks:
        logger.info(f"No data for {table_name}")
        return
    df_table = pd.concat(all_chunks, ignore_index=True)
    logger.info(f"Exported {len(df_table)} rows for {table_name}")
    # Upload DataFrame as JSON
    json_bytes = df_table.to_json(orient='records', lines=True).encode('utf-8')
    blob_name = f"{table_name}_{start_time.date()}_{end_time.date()}.json"
    blob_client = container_client.get_blob_client(blob=blob_name)
    # Retry on upload
    for attempt in range(3):
        try:
            blob_client.upload_blob(io.BytesIO(json_bytes), overwrite=True)
            logger.info(f"Uploaded {blob_name} to container {container_name}")
            break
        except Exception as err:
            logger.warning(f"Upload attempt {attempt+1} failed: {err}")
            time.sleep(2 ** attempt)
    else:
        logger.error(f"Failed to upload {blob_name} after retries")

# List of tables and ranges to export
tables = ["ISSAAPI_CL"]
# (Year, Month, Day)
# Starts at the beginning of this day
start_time = datetime(2025, 7, 14)
# Ends before the start of this day
end_time   = datetime(2025, 7, 22)
# How often to query data
time_chunk = timedelta(days=1)

# Parallel export
with ThreadPoolExecutor(max_workers=len(tables)) as executor:
    futures = [
        executor.submit(export_table, tbl, start_time, end_time, time_chunk)
        for tbl in tables
    ]
    for f in as_completed(futures):
        if f.exception():
            logger.error(f"Error in export: {f.exception()}")

logger.info("All exports completed.")

INFO:ExportPipeline:Starting export for ISSAAPI_CL
INFO:azure.identity._credentials.chained:DefaultAzureCredential acquired a token from ManagedIdentityCredential
INFO:ExportPipeline:Querying ISSAAPI_CL from 2025-07-14 00:00:00 to 2025-07-15 00:00:00
INFO:azure.identity._internal.msal_managed_identity_client:AzureMLCredential.get_token_info succeeded
INFO:azure.identity._internal.decorators:ManagedIdentityCredential.get_token_info succeeded
INFO:azure.identity._credentials.default:DefaultAzureCredential acquired a token from ManagedIdentityCredential
INFO:ExportPipeline:Querying ISSAAPI_CL from 2025-07-15 00:00:00 to 2025-07-16 00:00:00
INFO:ExportPipeline:Querying ISSAAPI_CL from 2025-07-16 00:00:00 to 2025-07-17 00:00:00
INFO:ExportPipeline:Querying ISSAAPI_CL from 2025-07-17 00:00:00 to 2025-07-18 00:00:00
INFO:ExportPipeline:Querying ISSAAPI_CL from 2025-07-18 00:00:00 to 2025-07-19 00:00:00
INFO:ExportPipeline:Querying ISSAAPI_CL from 2025-07-19 00:00:00 to 2025-07-20 00:00:00
INF