## Overview
This Notebook takes a KQL query and breaks it into batches that fit within the limits of the Azure Monitor API. This allows us to export more than the default 30,0000 record/64MB limits experienced when using the native interface and API directly. The export will run the batches in parallel and write the data to local disk in the format specified in the OUTPUT_FORMAT parameter (CSV or Parquet).
### Common Use Cases
- eDiscovery requests where a large number of rows need to exported and sent to an external party.
- Investigations where a large number of indicators need to be exported for external analysis.
- Compliance and external archival scenarios where certain datasets need to be stored outside of Log Analytics.
- Data Science/Engineering scenarios where analysts need access to a large dataset in CSV or Parquet format for additional analytics outside of KQL.

### Requirements and Recommended Practices
- **Make sure you are running on a [Compute Instance](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-run-jupyter-notebooks?view=azureml-api-2#run-a-notebook-or-python-script), NOT Serverless Spark Compute (which is the default method for a new AML workspace), as that method is not currently supported.** Provision a Compute Instance with at least 4 cores. For larger datasets, you can increase the cores and memory further, just remember to update the ```JOBS``` parameter to match the number of cores as needed.
- The [DefaultAzureCredential Class](https://learn.microsoft.com/en-us/python/api/azure-identity/azure.identity.defaultazurecredential?view=azure-python) is used to authenticate to the Log Analytics workspace. This should automatically authenticate the user that is launching the Notebook in AML, assuming the default SSO option is enabled when provisioning the AML compute instance and that user has access to the Log Analytics workspace. Also, ensure you press the **_"Authenticate"_** button if you see the _"You need to be authenticated to the compute..."_ banner within AML Studio. A [Managed Identity](https://learn.microsoft.com/en-us/azure/machine-learning/how-to-create-compute-instance?view=azureml-api-2&tabs=azure-studio#assign-managed-identity) assigned to the AML compute instance can be used instead, but it needs to have access to the Log Analytics workspace.
- Use the ```project``` operator in the ```QUERY``` parameter in **Step 2** to limit the amount of data being exported. This will help speed up the overall process significantly, given the current Azure Monitor API limits and the low throughput it provides. As a benchmark, it took 7 minutes to export all fields and 2.5 Million rows from the SecurityEvent table (roughly 3.2GB of data) using a 32-core AML Compute Instance. This is why being efficient in only selecting the fields needed is important for larger datasets.
- Run each cell independently, and in order, to ensure each step runs without issue before moving on to the next. Latter steps depend on the previous steps completing successfully.
- Review all of the parameters and their descriptions in **Step 2** to get a better sense of how to tune based on the dataset being exported.
- Check the logs.log file within the run directory for additional troubleshooting information.


## 1. Install Dependencies
Run this cell to install the required Phython libraries.

In [None]:
import sys
!{sys.executable} -m pip install azure-monitor-query azure-identity pandas tqdm

## 2. Set Parameters
Modify the below parameters as necessary and then run the below cell.

In [None]:
from datetime import datetime, timedelta, timezone

#Required parameters:
START_TIME = datetime(2025, 5, 1, tzinfo=timezone.utc) #Start time of the time range for the query.
END_TIME = datetime(2025, 5, 31, tzinfo=timezone.utc) #End time of the time range for the query.
QUERY = "SecurityEvent | project TimeGenerated, Account, Computer, EventID" #KQL query to run.

#If needed, change which Log Analytics workspace to use:
USE_DEFAULT_LAW_ID = True #If present, use the Log Analytics workspace ID that is present in the config.json file which gets created by Sentinel Notebooks.
LAW_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx" #Log Analytics workspace ID to use if config.json file is not present or USE_DEFAULT_LAW_ID is set to False.

#Optional parameters used for performance and output tuning:
JOBS = 4 #Number of jobs to run in parallel. Typically, this should match the number of cores of the VM. Because the Azure Monitor API can only run 5 concurrent queries at a time, there are diminishing returns after a certain point. Any value over 64 will revert to 64.
AUTO_BATCH = True #Attempts to automatically detect optimial batch size (time range) to use when breaking up the query.
BATCH_SIZE = timedelta(hours=6) #If AUTO_BATCH is set to False, this batch size (time range) will be used to break up the query.
MIN_BATCH_SIZE = timedelta(minutes=1) #If the data returned cannot fit within this time range, we skip and move to the next batch.
OUTPUT_DIRECTORY = "./law_export" #Directory where results will be stored. A new directory gets created for each run.
OUTPUT_FILE_PREFIX = "query_results" #Prefix used for the data files containing the query results.
OUTPUT_FORMAT = 'CSV' #File format used to store the query results on disk. CSV or PARQUET values are supported.
COMBINE_FORMAT = 'CSV' #File format used when combining files in Step 4. CSV or PARQUET values are supported.
COMBINE_MAX_ROWS = 500000 #Sets the max number of rows per file when combining in Step 4.
COMBINE_SORT = True #Sorts the data by the specified TIMESTAMP_FIELD when combining.
TIMESTAMP_FIELD = 'TimeGenerated' #Field to use for timestamp-based batching. TimeGenerated is default. For example, _OriginalTimeGenerated can be used for data restored via Search Job.
TG_START_TIME = datetime(2025, 5, 29, tzinfo=timezone.utc) #If using a timestamp field other than TimeGenerated, we still need to provide a time range for TimeGenerated in the request. If TIMESTAMP_FIELD is set to TimeGenerated, this parameter is ignored.
TG_END_TIME = datetime(2025, 6, 1, tzinfo=timezone.utc) #If using a timestamp field other than TimeGenerated, we still need to provide time range for TimeGenerated in the request. If TIMESTAMP_FIELD is set to TimeGenerated, this parameter is ignored.
TIMEOUT = 3 #Number of minutes allowed before query times out. 10 minutes is max.
LOG_LEVEL = 'INFO' #Logging level. Supported values are the standard DEBUG, INFO, WARNING, ERROR, etc.

## 3. Export Data
Run the below cell to start the export process. Data will be written to local files in the directory specified in the OUTPUT_DIRECTORY parameter.

In [None]:
from datetime import datetime, timedelta, timezone
import pandas as pd
import time
from azure.monitor.query import LogsQueryClient, LogsQueryStatus
from azure.core.exceptions import HttpResponseError
from azure.identity import DefaultAzureCredential
import logging
import os
import glob
import json
from multiprocessing import Pool, Manager, current_process, Queue
from tqdm import tqdm
from IPython.display import clear_output

completed_jobs = []
failed_jobs = []
errors = []
skipped_batches = []

time_format: str = "%m-%d-%Y %H-%M-%S"

class time_range_class:
    def __init__(self, name, start_time, end_time):
        self.name = name
        self.start_time = start_time
        self.end_time = end_time

def get_time_ranges(start_time=datetime.now(), end_time=datetime.now() - timedelta(hours=24), number_of_ranges=5):
    ranges = []
    interval = (end_time - start_time) / number_of_ranges
    delta = timedelta(microseconds=0)

    index = 0
    for i in range(number_of_ranges):
        range_name = f"Job{str(index)}"
        range_start = end_time - ((i + 1) * interval)
        range_end = (end_time - (i * interval)) - delta
        time_range = time_range_class(range_name, range_start, range_end)
        ranges.append(time_range)
        index += 1
        delta = timedelta(microseconds=1)

    return ranges

def read_config_values(file_path):
    try:
        with open(file_path) as json_file:
            if json_file:
                json_config = json.load(json_file)
                return (json_config["workspace_id"])
    except:
        return None

def write_to_file(df, export_path, prefix, format):
    match format:
        case 'PARQUET':
            path = os.path.join(export_path, f"{prefix}.parquet")
            df.to_parquet(path)
        case 'CSV':
            path = os.path.join(export_path, f"{prefix}.csv")
            df.to_csv(path, index=False)    
    
def get_batch_size(query, law_id, start_time, end_time, timeout, timestamp_field, tg_start_time, tg_end_time):
    batch_query_summarize = ("| summarize NumberOfBatchesBytes = 38400000 / avg(estimate_data_size(*)), NumberOfBatchesRows = count()"
    "| project NumberOfBatchesBytes = todecimal(NumberOfBatchesRows / NumberOfBatchesBytes), NumberOfBatchesRows = todecimal(NumberOfBatchesRows) / todecimal(450000)"
    "| project NumberOfBatches = round(iff(NumberOfBatchesBytes > NumberOfBatchesRows, NumberOfBatchesBytes, NumberOfBatchesRows), 2)"
    "| project NumberOfBatches = iif(NumberOfBatches < toreal(1), toreal(1), NumberOfBatches)")
  
    if timestamp_field != 'TimeGenerated':
        batch_query_where = (f"{query} | where {timestamp_field} between (todatetime('{start_time}') .. todatetime('{end_time}'))")
        batch_query = (f"{batch_query_where} {batch_query_summarize}")
        response = client.query_workspace(workspace_id=law_id, query=batch_query, timespan=(tg_start_time, tg_end_time), timeout=timeout)
    else:
        batch_query = (f"{query} {batch_query_summarize}")
        response = client.query_workspace(workspace_id=law_id, query=batch_query, timespan=(start_time, end_time), timeout=timeout)

    if response.status == LogsQueryStatus.SUCCESS:
        data = response.tables
    else:
        error = response.partial_error
        raise Exception(error.details[0]["innererror"])
    for table in data:
        df = pd.DataFrame(data=table.rows, columns=table.columns)
        
    return df['NumberOfBatches'].iloc[0]

def define_logger(name, log_level="INFO"):
    logger = logging.getLogger(name)
    logger.addHandler(logging.handlers.QueueHandler(log_queue))
    logger.setLevel(log_level)
    logger.handlers[0].setFormatter(logging.Formatter('%(asctime)s\t%(name)s\t%(levelname)s\t%(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
    return logger
           
def export_log_analytics_data(
    law_id: str,
    query: str,
    start_time: datetime = None,
    end_time: datetime = None,
    batch_size: timedelta = timedelta(hours=4),
    job_name: str = None,
    status_queue = None,
    log_queue = None,
    min_batch_size: timedelta = timedelta(minutes=15),
    client: LogsQueryClient = None,
    export_path = '',
    export_prefix = 'query_results',
    auto_batch = True,
    export_format: str = 'CSV',
    timeout: int = 10,
    timestamp_field = 'TimeGenerated',
    tg_start_time: datetime = None,
    tg_end_time: datetime = None,
    log_level = "INFO",
    delay: int = 0,
    max_retries: int = 5,
    export_to_file: bool = True,
    json_depth: int = 10,
    ):

    time_range: timedelta = end_time - start_time
    error_count: int = 0
    initial_batch_size: timedelta = batch_size
    batch_count: timedelta = timedelta()
    current_count: int = 0
    percent_complete: int = 0
    stop_time: datetime = start_time
    runs_without_error_count: int = 0
    loop_done: bool = False
    rows_returned: int = 0
    results = []

    logger = define_logger(name=f"{current_process().name}-{job_name}", log_level=log_level)
    
    logger.info(f"Starting new job between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}.")

    if auto_batch == True: 
        try:
            batch_size = time_range / get_batch_size(query, law_id, start_time, end_time, timeout=timeout, timestamp_field=timestamp_field,tg_start_time=tg_start_time, tg_end_time=tg_end_time)
            logger.info(f"Calculated auto-batch size of: {batch_size}")
        except Exception as err:
            logger.error(f"Unable to auto-batch, please check your query and the log for more info. You can disable auto-batch via the AUTO_BATCH parameter if the dataset is to large to calculate. {type(err)} {err}")
            return ({'job_name': job_name, 'status': 'error'})
    else:
        logger.info(f"Using manual batch size of: {batch_size}.")

    if batch_size > time_range: batch_size = time_range

    while error_count <= max_retries:
        try:
            while loop_done == False:
        
                if batch_size < initial_batch_size and runs_without_error_count > 5:
                    batch_size *= 2
                    logger.info(f"Increasing batch size to {batch_size}.")
                
                start_time = end_time - batch_size

                if start_time <= stop_time:
                    start_time = stop_time
                    batch_size = end_time - start_time
                    loop_done = True
                
                logger.info(f"Running query between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}.")

                if timestamp_field != 'TimeGenerated':
                    batch_query = (f"{query} | where {timestamp_field} between (todatetime('{start_time}') .. todatetime('{end_time}'))")
                    response = client.query_workspace(workspace_id=law_id, query=batch_query, timespan=(tg_start_time, tg_end_time), timeout=timeout)
                else:
                    response = client.query_workspace(workspace_id=law_id, query=query, timespan=(start_time, end_time), timeout=timeout)
                
                if response.status == LogsQueryStatus.SUCCESS:
                    data = response.tables
                else:
                    error = response.partial_error
                    raise Exception(error.details[0]["innererror"])

                file_prefix = f"{export_prefix}_{start_time.strftime(time_format)}"
                for table in data:
                    df = pd.DataFrame(data=table.rows, columns=table.columns)
                    if df.shape[0] > 0: write_to_file(df, export_path, file_prefix, export_format)
                    else: logger.info(f"No data returned for {start_time.strftime(time_format)} and {end_time.strftime(time_format)}, skipping writing to disk.")
                
                batch_count += batch_size
                percent_complete_previous = percent_complete
                percent_complete = round((batch_count / time_range) * 100)
                logger.info(f"Received {df.shape[0]} rows of data and wrote to {file_prefix}.{export_format.lower()}. Percent Complete: {percent_complete}")
                status_queue.put({'job_name': job_name, 'event': 'progress_update', 'message': (percent_complete - percent_complete_previous)})

                rows_returned += int(df.shape[0])

                runs_without_error_count += 1
                end_time = start_time + timedelta(microseconds=-1)
                time.sleep(delay)
            
            logger.info(f"Finished exporting {rows_returned} records from Log Analytics. Percent Complete: 100")
            status_queue.put({'job_name': job_name, 'event': 'progress_update', 'message': (100 - percent_complete)})
            close_logger(logger)

            return ({'job_name': job_name, 'status': 'success', 'rows_returned_total': rows_returned})
        except Exception as err:
            if "Response ended prematurely" in str(err):
                logger.warning(f"Response ended prematurely, retrying.")
                logger.debug(f"{type(err)} {err}")
            elif ("A recognition error occurred in the query") in str(err) or "A semantic error occurred" in str(err) or "The requested path does not exist" in str(err):
                logger.error(f"There is likely an error in the query or the workspace ID. {type(err)} {err}")
                return ({'job_name': job_name, 'status': 'error'})
            elif ("Maximum response size of 100000000 bytes exceeded" in str(err) 
            or 'The results of this query exceed the set limit of 64000000 bytes' in str(err) 
            or 'The results of this query exceed the set limit of 500000 records' in str(err)):
                runs_without_error_count = 0
                if batch_size == min_batch_size:
                    status_queue.put({'job_name': job_name, 'event': 'skipped_batch', 'message': f"{start_time.strftime(time_format)} - {end_time.strftime(time_format)}"})
                    logger.warning(f"Results cannot be returned in the specified minimum batch size. Skipping batch between {start_time.strftime(time_format)} and {end_time.strftime(time_format)}.")
                    logger.debug(f"{type(err)} {err}")
                    batch_count += batch_size
                    end_time = start_time + timedelta(microseconds=-1)
                    loop_done = False
                else:
                    batch_size = batch_size / 2
                    if batch_size < min_batch_size:
                        batch_size = min_batch_size
                    logger.info(f"Request was too large, reduced batch size to: {batch_size}.")
                    logger.debug(f"{type(err)} {err}")
                    loop_done = False
            else:
                logger.error(f"Unhandled Error: {type(err)} {err}")
                error_count += 1
                if error_count > max_retries:
                    logger.error("Max number of retries reached, exiting.")
                    return ({'job_name': job_name, 'status': 'error'})

def log_result(result):
    global completed_jobs
    global failed_jobs
    if result['status'] == 'success':
        completed_jobs.append(result)
    else:
        logger.error(f"{result['job_name']} has failed. Please check log file for details.")
        failed_jobs.append(result)

def log_error(error):
    logger.error(error)

def logger_process(queue, job_directory, log_level):
    message: logging.LogRecord
    logger = logging.getLogger('logger_process')
    logger.addHandler(logging.FileHandler(f"{job_directory}/logs.log"))
    logger.handlers[0].setLevel(log_level)
    while True:
        message = queue.get()
        if message is None:
            break
        if message.levelno >= 40:
            status_queue.put({'job_name': message.processName, 'event': 'error', 'message': message})
        logger.handle(message)
    close_logger(logger)

def close_logger(logger):
    for handler in logger.handlers:
        logger.removeHandler(handler)
        handler.close()

def get_workspace():
    workspace_id = read_config_values('config.json')

    if workspace_id == None or USE_DEFAULT_LAW_ID == False:
        if LAW_ID != "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx":
            law_id = LAW_ID
        else:
            raise Exception("Please specify a valid Log Analyics workspace ID in the Parameters cell.")
    else:
        law_id = workspace_id

    return law_id

def make_directory():
    if not os.path.exists(OUTPUT_DIRECTORY): os.makedirs(OUTPUT_DIRECTORY)
    job_directory = f"{OUTPUT_DIRECTORY}/{datetime.now().strftime(time_format)}"
    os.mkdir(job_directory)
    
    return job_directory

clear_output(wait=True)

job_directory = make_directory()
if JOBS > 64: JOBS = 64
ranges = get_time_ranges(start_time=START_TIME, end_time=END_TIME, number_of_ranges=JOBS )
law_id = get_workspace()    

credential = DefaultAzureCredential()
client = LogsQueryClient(credential)

pbar = tqdm(total=JOBS * 100, leave=True, position=0, desc=f"Splitting query into {JOBS} jobs for parallel processing", postfix={'Errors': 0, 'Skipped Batches': 0})

with Manager() as manager:
    status_queue = manager.Queue()
    log_queue = manager.Queue()
    last_queue_time = datetime.now()

    with Pool() as pool:
        logger = define_logger(name=current_process().name, log_level=LOG_LEVEL)
        logger.info(f"Starting logger process.")
        pool_logger_results = pool.apply_async(logger_process, args=(log_queue, job_directory, LOG_LEVEL), error_callback=log_error)
        logger.info(f"Starting {len(ranges)} query jobs between {START_TIME.strftime(time_format)} and {END_TIME.strftime(time_format)}.")
        pool_export_results = [pool.apply_async(export_log_analytics_data, [law_id, QUERY, i.start_time, i.end_time, BATCH_SIZE, i.name, status_queue, log_queue, MIN_BATCH_SIZE, client, job_directory, OUTPUT_FILE_PREFIX, AUTO_BATCH, OUTPUT_FORMAT, TIMEOUT, TIMESTAMP_FIELD, TG_START_TIME, TG_END_TIME, LOG_LEVEL], callback=log_result, error_callback=log_error) for  i in ranges]
        while (len(completed_jobs) + len(failed_jobs)) < JOBS or not status_queue.empty():
            if not status_queue.empty():
                item = status_queue.get()
                last_queue_time = datetime.now()
                match item['event']:
                    case 'progress_update': 
                        pbar.update(item['message'])
                    case 'skipped_batch': 
                        skipped_batches.append(item)
                        pbar.set_postfix(ordered_dict={'\033[91mErrors': len(errors), 'Skipped Batches': len(skipped_batches)})
                    case 'error':
                        errors.append(item)
                        pbar.set_postfix(ordered_dict={'\033[91mErrors': len(errors), 'Skipped Batches': len(skipped_batches)})
            if datetime.now() - last_queue_time > timedelta(minutes=TIMEOUT):
                log_message = f"No input received from running job(s) for more than {TIMEOUT} minutes, check log for errors. Exiting."
                logger.error(log_message)
                pbar.set_description(log_message)
                break
        if len(completed_jobs) > 0:
            log_message = f"Completed export of {sum([item['rows_returned_total'] for item in completed_jobs])} records to {job_directory}/."
            logger.info(log_message)
            pbar.set_description(log_message)
        else:
            pbar.set_description_str(f"No jobs completed successfully. Please check log file in {job_directory} for details.")
        log_queue.put(None)
        pool.close()
        pool.join()
        close_logger(logger)

time.sleep(2)

pbar.clear()
pbar.close()

time.sleep(2)

if skipped_batches:
    print("\nThe below batches were skipped. Try lowering the MIN_BATCH_SIZE parameter or reduce the size of the dataset. Review log file for more details.\n")
    for item in skipped_batches: print(item['message'])
   
if errors:
    print("\nThe below errors were encountered. Review log file for more details.\n")
    for item in errors: print(item['message'].message)

## 4. Combine Files (Optional)
Merges rows from individual data files into one or more files based on the COMBINE_MAX_ROWS parameter. Ensure you have enough memory allocated in your compute instance to account for the size of the exported dataset.

In [None]:
def combine_files(files, output_prefix, job_directory, output_format, combine_format, sort_column, sort, max_rows=50000):
    export_path = f'{job_directory}/combined'
    if not os.path.exists(export_path): os.mkdir(export_path)
        
    if output_format.lower() == 'csv':
        df = [pd.read_csv(file, low_memory=False) for file in tqdm(files, position=0, desc="Reading files")]
    if output_format.lower() == 'parquet':
        df = [pd.read_parquet(file) for file in tqdm(files, position=0, desc="Reading files")]
    file_index = 1
    row_index = 0
    
    pbar_combine = tqdm(leave=True, position=0, desc="Concatenating files, please wait")
    df = pd.concat(df, ignore_index=True)
    if sort: df.sort_values(by=[sort_column], inplace=True)
    total_rows = df.shape[0]
    pbar_combine.total = total_rows
    while row_index < total_rows:
        pbar_combine.set_description(f'Writing max of {max_rows} rows to each file')
        if combine_format.lower() == 'csv':
            df[row_index:(row_index + max_rows)].to_csv(f'{export_path}/{output_prefix}_combined{file_index}.csv', index=False)
        if combine_format.lower() == 'parquet':
            df[row_index:(row_index + max_rows)].to_parquet(f'{export_path}/{output_prefix}_combined{file_index}.parquet', index=False)
        row_index += (max_rows)
        file_index += 1
        
        if row_index > total_rows:
            pbar_combine.update(total_rows - (row_index - (max_rows)))
            pbar_combine.set_description(f'Completed combining {total_rows} rows into {file_index - 1} file(s), located in {export_path}/.')
        else:
            pbar_combine.update(max_rows) 
    pbar_combine.clear()
    pbar_combine.close()

clear_output(wait=True)
if OUTPUT_FORMAT.lower() in ('csv', 'parquet'):
    files = glob.glob(job_directory + '/*.{}'.format(OUTPUT_FORMAT.lower()))
    if TIMESTAMP_FIELD == 'TimeGenerated': 
        sort_column = TIMESTAMP_FIELD
    else:
        sort_column = TIMESTAMP_FIELD
    if files: combine_files(files=files, output_prefix=OUTPUT_FILE_PREFIX, output_format=OUTPUT_FORMAT, job_directory=job_directory, combine_format=COMBINE_FORMAT, max_rows=COMBINE_MAX_ROWS, sort_column=sort_column, sort=COMBINE_SORT)
    else: print("No files found.")
else: print("Please set OUTPUT_FORMAT parameter value to either CSV or PARQUET. Exiting.")

## 5. Delete Data (Optional)
Run the below cell to DELETE all run data including logs and data files.

In [None]:
import shutil
from IPython.display import clear_output

confirmation = input(f"Are you sure you want to delete the directory {OUTPUT_DIRECTORY} and all of its contents? (Y/N)")

clear_output(wait=True)

if confirmation.lower() in ('y', 'yes'):
    try:
        print("Deleting data...")
        shutil.rmtree(OUTPUT_DIRECTORY)
        print("Data has been deleted.")
    except Exception as err:
        print(f"Error deleting data: {err}")
else:
    print('Operation has been aborted.')