# Azure Log Analytics Data Export

This notebook demonstrates export of data (50M+ rows per day) from Azure Log Analytics to Blob Storage:

Inputs and Outputs:
- <b>Input:</b> table(s), columns, and date range
- <b>Output:</b> json, csv, or parquet files

Summary:
1. <b>Generate Test Data:</b> ingests fake data for testing
2. <b>Split Query and Send to Queue:</b> divides request into smaller queries/jobs and sends to a storage queue
3. <b>Process Queue:</b> runs jobs from the storage queue and saves output to storage account

# 1. Ingest Test Data

## Setup

In [None]:
import json
import logging
import os
import random
import string
import time

import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.monitor.ingestion import LogsIngestionClient
from azure.monitor.query import LogsQueryClient, LogsQueryStatus

In [None]:
logging_timestamp = str(pd.Timestamp.today())
logging_timestamp = (
    logging_timestamp.replace("-", "").replace(":", "").replace(".", "").replace(" ", "")
)
logging.basicConfig(
    filename=f"log-analytics-ingest-{logging_timestamp}.log",
    format="%(asctime)s %(levelname)s %(message)s",
    filemode="w",
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [None]:
def print_log(input: str) -> None:
    print(input)
    logger.info(input)


def query_log_qnalytics_request(
    workspace_id: str, client: LogsQueryClient, kql_query: str
) -> pd.DataFrame:
    """
    Makes API query request to log analytics
    limits: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/api/timeouts
    API query limits:
        500,000 rows per request
        200 requests per 30 seconds
        max query time is 10 mins
        100MB data max per request
    """
    try:
        response = client.query_workspace(
            workspace_id=workspace_id,
            query=kql_query,
            timespan=None,
            server_timeout=600,
        )
        if response.status == LogsQueryStatus.SUCCESS:
            table = response.tables[0]
            df = pd.DataFrame(data=table.rows, columns=table.columns)
            return df
        else:
            raise Exception(f"Unsucessful Request, Exception: {response.status}")
    except Exception as e:
        raise Exception(f"Failed Request, Exception: {response.status}")


def query_log_analytics_connection_request(
    credential: DefaultAzureCredential, workspace_id: str, kql_query: str
) -> pd.DataFrame:
    # log analytics connection
    # note: need to add Log Analytics Contributor role
    # note: need to add Log Analytics Publisher role
    log_client = LogsQueryClient(credential)
    # submit query request
    result_df = query_log_qnalytics_request(workspace_id, log_client, kql_query)
    return result_df


def generate_fake_data_log_analtyics(
    start_date: str,
    timedelta_seconds: int,
    number_of_rows: int,
    number_of_columns: int,
    random_length: int = 10,
) -> pd.DataFrame:
    # create dataframe
    start_datetime = pd.to_datetime(start_date)
    timedelta = pd.Series(range(number_of_rows)) * pd.to_timedelta(
        f"{timedelta_seconds}s"
    )
    fake_time_column = start_datetime + timedelta
    fake_data_df = pd.DataFrame(
        {
            "TimeGenerated": fake_time_column,
        }
    )
    for each_index in range(1, number_of_columns):
        each_column_name = f"DataColumn{each_index}"
        each_column_value = "".join(
            random.choice(string.ascii_lowercase) for i in range(random_length)
        )
        fake_data_df[each_column_name] = each_column_value
    # convert datetime to string column to avoid issues in log analtyics
    time_generated = fake_data_df["TimeGenerated"].dt.strftime("%Y-%m-%d %H:%M:%S.%f")
    fake_data_df["TimeGenerated"] = time_generated
    # status
    print(f"Fake Data Shape: {fake_data_df.shape}")
    print(f"Size: {fake_data_df.memory_usage().sum() / 1_000_000} MBs")
    print(f"First Datetime: {fake_data_df['TimeGenerated'].iloc[0]}")
    print(f"Last Datetime: {fake_data_df['TimeGenerated'].iloc[-1]}")
    return fake_data_df


def log_analytics_ingest(
    fake_data_df: pd.DataFrame,
    ingest_client: LogsIngestionClient,
    rule_id: str,
    stream_name: str,
) -> None:
    # convert to json
    body = json.loads(fake_data_df.to_json(orient="records", date_format="iso"))
    # send to log analytics
    ingest_client.upload(rule_id=rule_id, stream_name=stream_name, logs=body)


def generate_and_ingest_fake_date(
    credential: DefaultAzureCredential,
    workspace_id: str,
    endpoint: str,
    rule_id: str,
    stream_name: str,
    start_date: str,
    timedelta_seconds: int,
    number_of_rows: int,
    number_of_columns: int = 10,
) -> pd.DataFrame:
    """
    Generates fake data and ingests in Log Analytics for testing
        note: credential requires Log Analytics Contributor and Publisher roles
        note: 10M rows with 10 columns takes about 15-20 minutes
    Log Analytics Data Collection Endpoint and Rule setup:
        1. azure portal -> monitor -> create data collection endpoint
        2. azure protal -> log analtyics -> table -> create new custom table in log analtyics
        3. create data collection rule and add publisher role permissions
        reference: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/tutorial-logs-ingestion-portal
    Args:
        credential: DefaultAzureCredential
        workspace_id: log analytics workspace id
            format: "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
        endpoint: log analytics endpoint url
            format: "https://{name}-XXXX.eastus-1.ingest.monitor.azure.com"
        rule_id: required log analytics ingestion param
            format: "dcr-XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
        stream_name: required log analytics ingestion param
            format: "Custom-{tablename}"
        start_date: date to insert fake data
            format: "02-08-2024 00:00:00.000000"
            note: can only ingest dates up to 2 days in the past and 1 day into the future
            reference: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns
        timedelta_seconds: time between each fake data row
        number_of_rows: total number of rows to generate
        number_of_columns: total number of columns to generate
            note: for new columns, you need to update the schema before ingestion
            1. azure portal -> log analytics -> settings - tables -> ... -> edit schema
            2. azure portal -> data collection rules -> export template -> deploy -> edit
    Returns:
        pandas dataframe
    """
    # input validation
    given_timestamp = pd.to_datetime(start_date)
    current_datetime = pd.to_datetime("today")
    check_start_range = current_datetime - pd.to_timedelta("2D")
    check_end_range = current_datetime + pd.to_timedelta("1D")
    if not (check_start_range <= given_timestamp <= check_end_range):
        print_log("Warning: Date given is outside allowed ingestion range")
        print_log("Note: Log Analytics will use ingest time as TimeGenerated")
    if number_of_rows < 2 or number_of_columns < 2:
        raise Exception("invalid row and/or column numbers")
    # log analytics ingest connection
    ingest_client = LogsIngestionClient(endpoint, credential)
    # genereate fake data
    print_log("Generating Fake Data...")
    time_start = time.time()
    fake_data_df = generate_fake_data_log_analtyics(
        start_date, timedelta_seconds, number_of_rows, number_of_columns
    )
    print_log("Sending to Log Analytics...")
    # send to log analtyics
    log_analytics_ingest(fake_data_df, ingest_client, rule_id, stream_name)
    time_end = time.time()
    print_log(f"Runtime: {round(time_end-time_start,1)} seconds")
    return fake_data_df

## Authentication

In [None]:
# auth
# 1. service principal
# os.environ["AZURE_CLIENT_ID"] = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# os.environ["AZURE_TENANT_ID"] = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# os.environ["AZURE_CLIENT_SECRET"] = "XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# 2. command line
!az login

## Inputs

In [None]:
# connection
log_analytics_workspace_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
log_analytics_data_collection_endpoint = (
    "https://XXXXXXXXX-XXXXXX.eastus-1.ingest.monitor.azure.com"
)
log_analytics_data_collection_rule_id = "dcr-XXXXXXXXXXXXXXXXXXXXXXXXXXX"
log_analytics_data_collection_stream_name = "Custom-XXXXXXXXXXXXX_CL"
# params
start_datetime = "02-21-2024 00:00:00.000000"
time_delta_seconds = 0.0018
number_of_rows = 50_000_000

## Run

In [None]:
credential = DefaultAzureCredential()

In [None]:
fake_data_df = generate_and_ingest_fake_date(
    credential,
    log_analytics_workspace_id,
    log_analytics_data_collection_endpoint,
    log_analytics_data_collection_rule_id,
    log_analytics_data_collection_stream_name,
    start_datetime,
    time_delta_seconds,
    number_of_rows,
)

## Verify

In [None]:
# query log analtyics to confirm ingest
log_analytics_table_name = log_analytics_data_collection_stream_name[7:]
test_kql_query = f"""
{log_analytics_table_name}
| project-away TenantId, Type, _ResourceId
| sort by TimeGenerated desc
| take 100
"""
query_log_analytics_connection_request(
    credential,
    log_analytics_workspace_id,
    test_kql_query,
)

# 2. Split Query and Send to Queue

## Setup

In [None]:
import json
import logging
import os
import time

import pandas as pd
from azure.identity import DefaultAzureCredential
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from azure.storage.queue import QueueClient

In [None]:
logging_timestamp = str(pd.Timestamp.today())
logging_timestamp = (
    logging_timestamp.replace("-", "").replace(":", "").replace(".", "").replace(" ", "")
)
logging.basicConfig(
    filename=f"log-analytics-send-{logging_timestamp}.log",
    format="%(asctime)s %(levelname)s %(message)s",
    filemode="w",
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [None]:
def print_log(input: str) -> None:
    print(input)
    logger.info(input)


def query_log_qnalytics_request(
    workspace_id: str, client: LogsQueryClient, kql_query: str
) -> pd.DataFrame:
    """
    Makes API query request to log analytics
    limits: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/api/timeouts
    API query limits:
        500,000 rows per request
        200 requests per 30 seconds
        max query time is 10 mins
        100MB data max per request
    """
    try:
        response = client.query_workspace(
            workspace_id=workspace_id,
            query=kql_query,
            timespan=None,
            server_timeout=600,
        )
        if response.status == LogsQueryStatus.SUCCESS:
            table = response.tables[0]
            df = pd.DataFrame(data=table.rows, columns=table.columns)
            return df
        else:
            raise Exception(f"Unsucessful Request, Exception: {response.status}")
    except Exception as e:
        raise Exception(f"Failed Request, Exception: {response.status}")


def query_log_analytics_get_table_columns(
    table_names_and_columns: dict, workspace_id: str, client: LogsQueryClient
) -> dict:
    output = {}
    for each_table, each_columns in table_names_and_columns.items():
        # if column names provided, make no changes
        if each_columns:
            each_columns_fix = each_columns
            if "TimeGenerated" not in each_columns:
                each_columns_fix = ["TimeGenerated"] + each_columns
            output[each_table] = each_columns_fix
        # if no column names, query log analtyics for all column names
        else:
            # query log analtyics
            try:
                each_kql_query = f"""
                let TABLE_NAME = "{each_table}";
                table(TABLE_NAME)
                | project-away TenantId, Type, _ResourceId
                | take 1
                """
                each_df = query_log_qnalytics_request(
                    workspace_id, client, each_kql_query
                )
                each_columns_fix = list(each_df.columns)
                each_columns_fix.remove("TimeGenerated")
                each_columns_fix = ["TimeGenerated"] + each_columns_fix
                output[each_table] = each_columns_fix
            # request unsucessful, most likely table not found
            except Exception as e:
                print_log(f"{each_table}, Failed Request, Exception: {e}")
    if len(output) == 0:
        raise Exception("No valid table names")
    return output


def query_log_analytics_get_time_ranges(
    workspace_id: str,
    client: LogsQueryClient,
    table_name: str,
    start_datetime: str,
    end_datetime: str,
    query_row_limit: int,
) -> pd.DataFrame:
    # converted KQL output to string columns to avoid datetime digits getting truncated
    kql_query = f"""
    let TABLE_NAME = "{table_name}";
    let START_DATETIME = datetime({start_datetime});
    let END_DATETIME = datetime({end_datetime});
    let QUERY_ROW_LIMIT = {query_row_limit};
    let table_size = toscalar(table(TABLE_NAME) | count);
    let time_splits = table(TABLE_NAME)
    | project TimeGenerated
    | where (TimeGenerated >= START_DATETIME) and (TimeGenerated < END_DATETIME)
    | order by TimeGenerated asc
    | extend row_index = row_number()
    | where row_index == 1 or row_index % (QUERY_ROW_LIMIT) == 0 or row_index == table_size;
    let time_pairs = time_splits
    | project StartTime = TimeGenerated
    | extend EndTime = next(StartTime)
    | where isnotnull(EndTime)
    | extend StartTime = tostring(StartTime), EndTime = tostring(EndTime);
    time_pairs
    """
    try:
        df = query_log_qnalytics_request(workspace_id, client, kql_query)
    # request unsucessful, most likely table not found
    except Exception as e:
        print_log(f"{table_name}, Failed Request, Exception: {e}")
        return pd.DataFrame()
    # table exists, but no results returned
    if df.shape[0] == 0:
        return pd.DataFrame()
    # datetime fix for events on final datetime
    final_endtime = df["EndTime"].iloc[-1]
    new_final_endtime = str(pd.to_datetime(final_endtime) + pd.to_timedelta("0.0000001s"))
    new_final_endtime_fix_format = new_final_endtime.replace(" ", "T").replace(
        "00+00:00", "Z"
    )
    df["EndTime"].iloc[-1] = new_final_endtime_fix_format
    return df


def query_log_analytics_get_table_count(
    workspace_id: str,
    client: LogsQueryClient,
    table_name: str,
    start_datetime: str,
    end_datetime: str,
) -> int:
    kql_query = f"""
    let TABLE_NAME = "{table_name}";
    let START_DATETIME = datetime({start_datetime});
    let END_DATETIME = datetime({end_datetime});
    table(TABLE_NAME)
    | project TimeGenerated
    | where (TimeGenerated >= START_DATETIME) and (TimeGenerated < END_DATETIME)
    | count
    """
    df = query_log_qnalytics_request(workspace_id, client, kql_query)
    return df.values[0][0]


def query_log_analytics_add_table_row_counts(
    input_df: pd.DataFrame,
    workspace_id: str,
    client: LogsQueryClient,
    table_name: str,
) -> pd.DataFrame:
    # add row counts
    results = []
    for each_row in input_df.itertuples():
        each_starttime = each_row.StartTime
        each_endtime = each_row.EndTime
        each_count = query_log_analytics_get_table_count(
            workspace_id, client, table_name, each_starttime, each_endtime
        )
        results.append(each_count)
    input_df["Count"] = results
    return input_df


def query_log_analytics(
    workspace_id: str,
    client: LogsQueryClient,
    table_name: str,
    start_datetime: str,
    end_datetime: str,
    query_row_limit: int,
    query_row_limit_correction: int,
    add_row_counts: bool,
) -> pd.DataFrame:
    # fix for large number of events at same datetime
    query_row_limit_fix = query_row_limit - query_row_limit_correction
    # get time ranges
    results_df = query_log_analytics_get_time_ranges(
        workspace_id,
        client,
        table_name,
        start_datetime,
        end_datetime,
        query_row_limit_fix,
    )
    # empty results
    if results_df.shape[0] == 0:
        return pd.DataFrame()
    # add row counts column
    if add_row_counts:
        results_df = query_log_analytics_add_table_row_counts(
            results_df, workspace_id, client, table_name
        )
        # warning if exceed query limit
        if results_df.Count.gt(query_row_limit).any():
            raise Exception("Sub-Query exceeds query row limit, change limit params")
    # add table name column
    results_df.insert(loc=0, column="Table", value=[table_name] * len(results_df))
    return results_df


def break_up_date_range_days(
    table_name: str,
    start_datetime: str,
    end_datetime: str,
) -> pd.DataFrame:
    # break up date range
    date_range = pd.date_range(start=start_datetime, end=end_datetime, freq="D")
    # convert timestamps back to strings
    date_range = [str(each) for each in date_range.to_list()]
    # add partial day (if needed)
    if end_datetime[-8:] != "00:00:00":
        date_range += [end_datetime]
    # group into time pairs
    time_pairs = [(date_range[i], date_range[i + 1]) for i in range(len(date_range) - 1)]
    # convert to dataframe
    df_time_pairs = pd.DataFrame(time_pairs, columns=["start_date", "end_date"])
    df_time_pairs.insert(loc=0, column="table", value=[table_name] * len(df_time_pairs))
    return df_time_pairs


def break_up_query(
    table_names: list[str],
    start_datetime: str,
    end_datetime: str,
) -> pd.DataFrame:
    results = []
    # break up by table names
    for each_table_name in table_names:
        # break up date ranges by day
        each_df = break_up_date_range_days(each_table_name, start_datetime, end_datetime)
        results.append(each_df)
    df_results = pd.concat(results)
    return df_results


def query_log_analytics_send_to_queue(
    credential: DefaultAzureCredential,
    subscription_id: str,
    resource_group: str,
    worksapce_name: str,
    workspace_id: str,
    storage_queue_url: str,
    table_names_and_columns: dict,
    start_datetime: str,
    end_datetime: str,
    query_row_limit: int = 500_000,
    query_row_limit_correction: int = 1_000,
    request_wait_seconds: int = 0.05,
    add_row_counts: bool = True,
) -> None:
    """
    Generates
        note: credential requires Log Analytics Contributor and Storage Queue Data Contributor roles
        note: date range is processed as [start_datetime, end_datetime)
    Args:
        credential: azure default credential object
        subscription_id: azure subscription id
            format: "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
        resource_group: azure resource group
        workspace_name: name of log analytics workspace
        workspace_id: log analtyics workspace id
            format: "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
        storage_queue_url: storage account queue url
            format: "https://{storage_account_name}.queue.core.windows.net/{queue_name}"
        table_names: dictionary of table names with columns to project
            note: blank column list will detect and use all columns
            format:  {"table_name" : ["column_1", "column_2", ... ], ... }
        start_datetime: starting datetime, inclusive
            format: YYYY-MM-DD HH:MM:SS
        end_datetime: ending datetime, exclusive
            format: YYYY-MM-DD HH:MM:SS
        query_row_limit: max number of rows for each follow-up query/message
        query_row_limit_correction: correction factor in case of overlaping data
        request_wait_seconds: wait time between http requests
        add_row_counts: adds expected row count for queries to messages
    Return
        None
    """
    # input validation
    try:
        pd.to_datetime(start_datetime)
        pd.to_datetime(end_datetime)
    except Exception as e:
        raise Exception(f"Invalid Datetime Format, Exception {e}")
    # log analytics connection
    # note: need to add Log Analytics Contributor role
    log_client = LogsQueryClient(credential)
    # storage queue connection
    # note: need to add Storage Queue Data Contributor role
    queue_client = QueueClient.from_queue_url(storage_queue_url, credential)
    # process table and column names
    table_names_and_columns = query_log_analytics_get_table_columns(
        table_names_and_columns, workspace_id, log_client
    )
    # break up queries by table and date ranges
    table_names = list(table_names_and_columns.keys())
    df_queries = break_up_query(table_names, start_datetime, end_datetime)
    # query log analytis, gets datetime splits for query row limit
    print_log("Querying Log Analytics...")
    query_results = []
    for each_query in df_queries.itertuples():
        each_table_name = each_query.table
        each_start_datetime = each_query.start_date
        each_end_datetime = each_query.end_date
        each_results_df = query_log_analytics(
            workspace_id,
            log_client,
            each_table_name,
            each_start_datetime,
            each_end_datetime,
            query_row_limit,
            query_row_limit_correction,
            add_row_counts,
        )
        # check status
        each_status = f"{each_table_name}: "
        each_status += f"{each_start_datetime} - {each_end_datetime} "
        each_status += f"-> {each_results_df.shape[0]} Queries"
        print_log(each_status)
        # skip empty results
        if each_results_df.shape[0] > 0:
            query_results.append(each_results_df)
    if query_results:
        # combine all results
        results_df = pd.concat(query_results)
        # add column names
        column_names = results_df["Table"].apply(lambda x: table_names_and_columns[x])
        results_df.insert(loc=1, column="Columns", value=column_names)
        # add azure property columns
        results_df.insert(loc=2, column="Subscription", value=subscription_id)
        results_df.insert(loc=3, column="ResourceGroup", value=resource_group)
        results_df.insert(loc=4, column="LogAnalyticsWorkspace", value=worksapce_name)
        results_df.insert(loc=5, column="LogAnalyticsWorkspaceId", value=workspace_id)
        # convert to dictionary
        results = results_df.to_dict(orient="records")
        print_log(f"Sending {len(results)} Query Messages to Storage Queue...")
        # send to queue
        for each_job in results:
            try:
                each_job_json = json.dumps(each_job)
                queue_client.send_message(each_job_json)
            except Exception as e:
                print_log(f"Unable to submit query: {each_job}, exception: {e}")
            finally:
                time.sleep(request_wait_seconds)
        print_log(f"Queue Status: {queue_client.get_queue_properties()}")
    # no results
    else:
        print_log("Error: No Query Messages Generated")

## Authentication

In [None]:
# auth
# 1. service principal
# os.environ["AZURE_CLIENT_ID"] = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# os.environ["AZURE_TENANT_ID"] = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# os.environ["AZURE_CLIENT_SECRET"] = "XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# 2. command line
!az login

## Inputs

In [None]:
# connections
subscription_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
resource_group_name = "XXXXXXXXXXXXXXXXXXXXX"
log_analytics_worksapce_name = "XXXXXXXXXXX"
log_analytics_workspace_id = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
storage_queue_url = "https://XXXXXXXXXXXXXXX.queue.core.windows.net/XXXXXXXXXXXXXX"
# params
table_names_and_columns = {
    "XXXXXXXXXXX_CL": [
        "TimeGenerated",
        "DataColumn1",
        "DataColumn2",
        "DataColumn3",
        "DataColumn4",
        "DataColumn5",
        "DataColumn6",
        "DataColumn7",
        "DataColumn8",
        "DataColumn9",
    ]
}
start_datetime = "2024-02-21 00:00:00"
end_datetime = "2024-02-22 00:00:00"
query_row_limit = 500_000

## Run

In [None]:
credential = DefaultAzureCredential()

In [None]:
query_log_analytics_send_to_queue(
    credential,
    subscription_id,
    resource_group_name,
    log_analytics_worksapce_name,
    log_analytics_workspace_id,
    storage_queue_url,
    table_names_and_columns,
    start_datetime,
    end_datetime,
    query_row_limit,
)

## Verify

In [None]:
# preview queue messages
QueueClient.from_queue_url(storage_queue_url, credential).peek_messages(5)

# 3. Process Queue

## Setup

In [None]:
import json
import logging
import os
import time
from io import BytesIO, StringIO

import pandas as pd
import pyarrow
from azure.identity import DefaultAzureCredential
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from azure.storage.blob import ContainerClient
from azure.storage.queue import QueueClient

In [None]:
logging_timestamp = str(pd.Timestamp.today())
logging_timestamp = (
    logging_timestamp.replace("-", "").replace(":", "").replace(".", "").replace(" ", "")
)
logging.basicConfig(
    filename=f"log-analytics-process-{logging_timestamp}.log",
    format="%(asctime)s %(levelname)s %(message)s",
    filemode="w",
)
logger = logging.getLogger()
logger.setLevel(logging.INFO)

In [None]:
def print_log(input: str) -> None:
    print(input)
    logger.info(input)


def query_log_qnalytics_request(
    workspace_id: str, client: LogsQueryClient, kql_query: str
) -> pd.DataFrame:
    """
    Makes API query request to log analytics
    limits: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/api/timeouts
    API query limits:
        500,000 rows per request
        200 requests per 30 seconds
        max query time is 10 mins
        100MB data max per request
    """
    try:
        response = client.query_workspace(
            workspace_id=workspace_id,
            query=kql_query,
            timespan=None,
            server_timeout=600,
        )
        if response.status == LogsQueryStatus.SUCCESS:
            table = response.tables[0]
            df = pd.DataFrame(data=table.rows, columns=table.columns)
            return df
        else:
            raise Exception(f"Unsucessful Request, Exception: {response.status}")
    except Exception as e:
        raise Exception(f"Failed Request, Exception: {response.status}")


def query_log_analytics_get_results(
    workspace_id: str,
    client: LogsQueryClient,
    table_name: str,
    message_column_names: list[str],
    start_datetime: str,
    end_datetime: str,
) -> pd.DataFrame:
    columns_to_project = ", ".join(message_column_names)
    kql_query = f"""
    let TABLE_NAME = "{table_name}";
    let START_DATETIME = datetime({start_datetime});
    let END_DATETIME = datetime({end_datetime});
    table(TABLE_NAME)
    | project {columns_to_project}
    | where (TimeGenerated >= START_DATETIME) and (TimeGenerated < END_DATETIME)
    """
    df = query_log_qnalytics_request(workspace_id, client, kql_query)
    return df


def datetime_to_filename_safe(input: str) -> str:
    # remove characters from timestamp to be filename safe/readable
    output = input.replace("-", "").replace(":", "").replace(".", "")
    output = output.replace("T", "").replace("Z", "")
    output = output.replace(" ", "")
    return output


def download_blob(
    filename: str,
    credential: DefaultAzureCredential,
    storage_blob_url_and_container: list[str],
) -> pd.DataFrame:
    # storage blob connection
    # note: need to add Storage Blob Data Contributor role
    storage_blob_url, storage_container_name = storage_blob_url_and_container
    container_client = ContainerClient(
        storage_blob_url, storage_container_name, credential
    )
    # download data
    blob_client = container_client.get_blob_client(filename)
    downloaded_blob = blob_client.download_blob()
    if filename.endswith(".json"):
        stream = StringIO(downloaded_blob.content_as_text())
        output_df = pd.read_json(stream, lines=True)
    elif filename.endswith(".csv"):
        stream = StringIO(downloaded_blob.content_as_text())
        output_df = pd.read_csv(stream)
    elif filename.endswith(".parquet"):
        stream = BytesIO()
        downloaded_blob.readinto(stream)
        output_df = pd.read_parquet(stream, engine="pyarrow")
    else:
        raise Exception("file extension not supported")
    return output_df


def list_blobs_df(
    credential: DefaultAzureCredential, storage_blob_url_and_container: list[str]
) -> pd.DataFrame:
    # increase column display for longer filenames
    pd.set_option("max_colwidth", None)
    # storage blob connection
    # note: need to add Storage Blob Data Contributor role
    storage_blob_url, storage_container_name = storage_blob_url_and_container
    container_client = ContainerClient(
        storage_blob_url, storage_container_name, credential
    )
    # get blobs
    results = []
    for each_file in container_client.list_blobs():
        each_name = each_file.name
        each_size_MB = each_file.size / 1_000_000
        each_date = each_file.creation_time
        results.append([each_name, each_size_MB, each_date])
    # convert to dataframe
    df = pd.DataFrame(results, columns=["filename", "file_size_mb", "creation_time"])
    return df


def process_queue(
    credential: DefaultAzureCredential,
    storage_queue_url: str,
    storage_blob_url_and_container: list[str],
    output_format: str = "JSONL",
    confirm_row_count: bool = True,
    message_visibility_timeout_seconds: int = 1000,
    azure_storage_connection_timeout_fix_seconds: int = 600,
    request_wait_seconds: float = 0.05,
) -> None:
    """
    Processes Log Analytics query jobs/messages from a storage queue and exports to Blob Storage
        note: credential requires Log Analytics Contributor, Storage Queue Data Contributor, and Stroage Blob Data Contributor roles
        note: takes ~150 seconds for a query with 500k rows and 10 columns to csv (100 seconds for parquet)
    Args:
        credential: azure default credential object
        storage_queue_url: storage account queue url
            format: "https://{storage_account_name}.queue.core.windows.net/{queue_name}"
        storage_blob_url_and_container: storage account blob url and container name
            format: ["https://{storage_account_name}.blob.core.windows.net/", "{container_name}"]
        output_format: output file format, options = "JSONL", "CSV", "PARQUET
            note: JSONL is json line delimited
        confirm_row_count: enables check if row count in message matches downloaded data
        message_visibility_timeout_seconds: number of seconds for queue message visibility
        azure_storage_connection_timeout_fix_seconds: upload blob file timeout ins econds
        request_wait_seconds: wait between http requests
    Returns:
        None
    """
    # input validation
    support_file_formats = ["JSONL", "CSV", "PARQUET"]
    if output_format not in support_file_formats:
        raise Exception("File format not supported")
    # log analytics connection
    # note: need to add Log Analytics Contributor role
    log_client = LogsQueryClient(credential)
    # storage queue connection
    # note: need to add Storage Queue Data Contributor role
    queue_client = QueueClient.from_queue_url(storage_queue_url, credential)
    # storage blob connection
    # note: need to add Storage Blob Data Contributor role
    storage_blob_url, storage_container_name = storage_blob_url_and_container
    container_client = ContainerClient(
        storage_blob_url, storage_container_name, credential
    )
    # process messages from queue until empty
    while True:
        # get next message
        try:
            runtime_start = time.time()
            queue_message = queue_client.receive_message(
                visibility_timeout=message_visibility_timeout_seconds
            )
        except Exception as e:
            print_log(f"Unable to get queue message, {e}")
            continue
        finally:
            time.sleep(request_wait_seconds)
        if queue_message:
            # extract message
            print_log(f"Processing Message: {queue_message.content}")
            try:
                message_content = json.loads(queue_message.content)
                message_table_name = message_content["Table"]
                message_column_names = message_content["Columns"]
                message_subscription = message_content["Subscription"]
                message_resource_group = message_content["ResourceGroup"]
                message_log_analytics_name = message_content["LogAnalyticsWorkspace"]
                message_log_analytics_id = message_content["LogAnalyticsWorkspaceId"]
                message_start_datetime = message_content["StartTime"]
                message_end_datetime = message_content["EndTime"]
                if confirm_row_count:
                    message_result_count = message_content["Count"]
            except Exception as e:
                print_log(f"Unable to extract json, {queue_message}, {e}")
                continue
            # query log analytics
            try:
                # get results
                each_results_df = query_log_analytics_get_results(
                    message_log_analytics_id,
                    log_client,
                    message_table_name,
                    message_column_names,
                    message_start_datetime,
                    message_end_datetime,
                )
                # row count check
                print_log(
                    f"Sucessfully Downloaded from Log Analytics: {each_results_df.shape}"
                )
                if confirm_row_count:
                    if each_results_df.shape[0] != message_result_count:
                        print_log(f"Row count doesn't match expected, {queue_message}")
                        continue
            except Exception as e:
                print_log(f"Unable to query log analytics, {queue_message}, {e}")
                continue
            finally:
                time.sleep(request_wait_seconds)
            # send to storage
            try:
                # datetime convversion (using pandas native export functions instead)
                # dt.strftime('%Y-%m-%dT%H:%M:%S.%fZ')
                # extract datetime values for filename
                first_timestamp = each_results_df["TimeGenerated"].iloc[0]
                extract_year = first_timestamp.strftime("%Y")
                extract_month = first_timestamp.strftime("%m")
                extract_day = first_timestamp.strftime("%d")
                extract_hour = first_timestamp.strftime("%H")
                # output filename
                # mimics continuous export from log analytics
                # https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-data-export
                each_filename = f"{message_table_name}/"
                each_filename += f"WorkspaceResourceId=/"
                each_filename += f"subscriptions/{message_subscription}/"
                each_filename += f"resourcegroups/{message_resource_group}/"
                each_filename += f"providers/microsoft.operationalinsights/"
                each_filename += f"workspaces/{message_log_analytics_name}/"
                each_filename += f"y={extract_year}/m={extract_month}/d={extract_day}/"
                each_filename += f"h={extract_hour}/"
                each_filename += f"{datetime_to_filename_safe(message_start_datetime)}-"
                each_filename += f"{datetime_to_filename_safe(message_end_datetime)}"
                # file format
                if output_format == "JSONL":
                    each_filename += ".json"
                    each_output_file = each_results_df.to_json(
                        orient="records", lines=True, date_format="iso", date_unit="ns"
                    )
                elif output_format == "CSV":
                    each_filename += ".csv"
                    each_output_file = each_results_df.to_csv(index=False)
                elif output_format == "PARQUET":
                    each_filename += ".parquet"
                    each_output_file = each_results_df.to_parquet(
                        index=False, engine="pyarrow"
                    )
                # upload to storage
                # note: set undocumented param connection_timeout to avoid timeout errors
                # https://stackoverflow.com/questions/65092741/solve-timeout-errors-on-file-uploads-with-new-azure-storage-blob-package
                blob_client = container_client.get_blob_client(each_filename)
                blob_client_output = blob_client.upload_blob(
                    data=each_output_file,
                    connection_timeout=azure_storage_connection_timeout_fix_seconds,
                    overwrite=True,
                )
                print_log(f"Sucessfully Uploaded to Storage: {each_filename}")
            except Exception as e:
                print_log(f"Unable to upload to storage, {queue_message}, {e}")
                continue
            finally:
                time.sleep(request_wait_seconds)
            # remove message from queue
            try:
                queue_client.delete_message(queue_message)
                runtime_end = time.time()
                print_log(f"Sucessfully Deleted Message from Queue")
                print_log(f"Runtime: {round(runtime_end-runtime_start, 1)} seconds")
            except Exception as e:
                print_log(f"Unable to delete message, {queue_message}, {e}")
                continue
            finally:
                time.sleep(request_wait_seconds)
        # queue empty
        else:
            print_log(f"Waiting for message visibility timeout...")
            time.sleep(message_visibility_timeout_seconds + 60)
            # check if queue still empty
            try:
                peek_message = queue_client.peek_messages()
                # exit program
                if not peek_message:
                    print_log(f"Run Complete - No Messages to Process")
                    break
            except Exception as e:
                print_log(f"Unable to peek queue message, {e}")
                continue
            finally:
                time.sleep(request_wait_seconds)

## Authentication

In [None]:
# auth
# 1. service principal
# os.environ["AZURE_CLIENT_ID"] = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# os.environ["AZURE_TENANT_ID"] = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# os.environ["AZURE_CLIENT_SECRET"] = "XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# 2. command line
!az login

## Inputs

In [None]:
# connections
storage_queue_url = "https://XXXXXXXXXXXXXXXXXX.queue.core.windows.net/XXXXXXXXX"
storage_blob_url_and_container = [
    "https://XXXXXXXXXXXXXXXXX.blob.core.windows.net/",
    "XXXXXXXXXX",
]

## Run

In [None]:
credential = DefaultAzureCredential()

In [None]:
process_queue(credential, storage_queue_url, storage_blob_url_and_container)

## Verify

In [None]:
# list files
list_blobs_df(credential, storage_blob_url_and_container)

In [None]:
# read file
file_list = list_blobs_df(credential, storage_blob_url_and_container)
most_recent_filename = file_list.sort_values("creation_time")["filename"].iloc[0]
download_blob(most_recent_filename, credential, storage_blob_url_and_container)