Skip to content

Commit

Permalink
Modified the upload_df method to split the data into batches of 10,00… (
Browse files Browse the repository at this point in the history
#633)

* Modified the upload_df method to split the data into batches of 10,000 rows and upload each batch to Log Analytics separately. This should help to improve the upload speed and avoid timeouts for large data sets.

* Fixing linting error

---------

Co-authored-by: jllangley <orig3n@localhost.com>
Co-authored-by: Ian Hellen <ianhelle@microsoft.com>
  • Loading branch information
3 people committed Mar 17, 2023
1 parent e97deac commit 8bc586d
Showing 1 changed file with 18 additions and 22 deletions.
40 changes: 18 additions & 22 deletions msticpy/data/uploaders/loganalytics_uploader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,13 @@
# Licensed under the MIT License. See License.txt in the project root for
# license information.
# --------------------------------------------------------------------------
"""LogAnayltics Uploader class."""
"""LogAnalytics Uploader class."""
import base64
import datetime
import hashlib
import hmac
import json
import re
import sys
from pathlib import Path
from typing import Any

Expand Down Expand Up @@ -58,23 +57,23 @@ def _build_signature(
Parameters
----------
date : str
datetime of authencation session.
datetime of authentication session.
content_length : int
lenght of content to be passed to the api.
length of content to be passed to the api.
method : str
HTTP method being used.
content_type : str
Type of content being passed to the API.
resource : str
The API endpoint being targetted.
The API endpoint being targeted.
Returns
-------
str
The encoded authorization string.
"""
x_headers = "x-ms-date:" + date
x_headers = f"x-ms-date:{date}"
string_to_hash = "\n".join(
[method, str(content_length), content_type, x_headers, resource]
)
Expand All @@ -83,8 +82,7 @@ def _build_signature(
encoded_hash = base64.b64encode(
hmac.new(decoded_key, bytes_to_hash, digestmod=hashlib.sha256).digest()
).decode()
authorization = f"SharedKey {self.workspace}:{encoded_hash}"
return authorization
return f"SharedKey {self.workspace}:{encoded_hash}"

def _post_data(self, body: str, table_name: str):
"""
Expand All @@ -107,7 +105,9 @@ def _post_data(self, body: str, table_name: str):

resource = "/api/logs"
content_type = "application/json"
rfc1123date = datetime.datetime.utcnow().strftime("%a, %d %b %Y %H:%M:%S GMT")
rfc1123date = datetime.datetime.now(datetime.timezone.utc).strftime(
"%a, %d %b %Y %H:%M:%S GMT"
)
content_length = len(body)
signature = self._build_signature(
rfc1123date, content_length, "POST", content_type, resource
Expand Down Expand Up @@ -154,23 +154,19 @@ def upload_df(self, data: pd.DataFrame, table_name: Any, **kwargs):
Pandas DataFrame to upload.
table_name : str
Custom table name to upload the data to.
batch_size : int
Custom number of rows to batch.
"""
batch_size = kwargs.get("batch_size", 10000) # Set a default batch size
events = []
for row in data.iterrows():
for i, row in enumerate(data.iterrows()):
events.append(row[1].astype(str).to_dict())
# Due to 30MB limit if data is larger than 25Mb upload that chunk then continue
if sys.getsizeof(json.dumps(events)) > 26214400:
if self._debug is True:
print("Data larger than 25MB spliting data requests.")
if (i + 1) % batch_size == 0 or i == len(data) - 1:
body = json.dumps(events)
self._post_data(body, table_name)
events = []

if events:
body = json.dumps(events)
self._post_data(body, table_name)

if self._debug:
print(f"Uploaded batch {i // batch_size + 1}")
if self._debug:
print(f"Upload to {table_name} complete")

Expand Down Expand Up @@ -217,12 +213,12 @@ def upload_folder(
t_name = bool(table_name)
input_files = Path(folder_path).glob(ext)
# pylint: disable=unnecessary-comprehension
input_files = [path for path in input_files] # type: ignore
input_files = list(input_files) # type: ignore
# pylint: enable=unnecessary-comprehension
progress = tqdm(total=len(list(input_files)), desc="Files", position=0)
for path in input_files:
data = pd.read_csv(path, delimiter=delim)
if t_name is False:
if not t_name:
table_name = path.stem
self.upload_df(data, table_name)
progress.update(1)
Expand Down

0 comments on commit 8bc586d

Please sign in to comment.