Skip to content

Commit

Permalink
(feature): query dataset to return pandas df
Browse files Browse the repository at this point in the history
  • Loading branch information
Rambatino committed Feb 13, 2024
1 parent 83835c9 commit d0100b9
Show file tree
Hide file tree
Showing 12 changed files with 369 additions and 87 deletions.
154 changes: 134 additions & 20 deletions axiom/client.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,34 @@
"""Client provides an easy-to use client library to connect to Axiom."""

import ndjson
import dacite
import gzip
import ujson
import rfc3339
import os
from .util import Util
from enum import Enum
from humps import decamelize
from typing import Optional, List, Dict, Any
from logging import getLogger
from dataclasses import dataclass, field, asdict
from datetime import datetime
from datetime import datetime, timedelta
from requests_toolbelt.sessions import BaseUrlSession
from requests_toolbelt.utils.dump import dump_response
from requests.adapters import HTTPAdapter, Retry
from .datasets import DatasetsClient
from .query import QueryLegacy, QueryResult, QueryOptions, QueryLegacyResult, QueryKind
from .query import (
QueryLegacy,
LegacyQueryResult,
QueryOptions,
QueryLegacyResult,
QueryKind,
TabularQueryResult,
)
from .users import UsersClient
from .__init__ import __version__

import concurrent.futures
import pandas as pd

AXIOM_URL = "https://api.axiom.co"

Expand Down Expand Up @@ -71,6 +81,7 @@ class AplResultFormat(Enum):
"""The result format of an APL query."""

Legacy = "legacy"
Tabular = "tabular"


class ContentType(Enum):
Expand Down Expand Up @@ -101,6 +112,8 @@ class AplOptions:
no_cache: bool = field(default=False)
save: bool = field(default=False)
format: AplResultFormat = field(default=AplResultFormat.Legacy)
include_cursor: bool = field(default=False)
cursor: str = field(default=None)


def raise_response_error(r):
Expand Down Expand Up @@ -180,22 +193,22 @@ def ingest(
self,
dataset: str,
payload: bytes,
contentType: ContentType,
content_type: ContentType,
enc: ContentEncoding,
opts: Optional[IngestOptions] = None,
) -> IngestStatus:
"""Ingest the events into the named dataset and returns the status."""
path = "datasets/%s/ingest" % dataset

# check if passed content type and encoding are correct
if not contentType:
if not content_type:
raise ValueError("unknown content-type, choose one of json,x-ndjson or csv")

if not enc:
raise ValueError("unknown content-encoding")

# set headers
headers = {"Content-Type": contentType.value, "Content-Encoding": enc.value}
headers = {"Content-Type": content_type.value, "Content-Encoding": enc.value}
# prepare query params
params = self._prepare_ingest_options(opts)

Expand Down Expand Up @@ -225,10 +238,10 @@ def query_legacy(
self, id: str, query: QueryLegacy, opts: QueryOptions
) -> QueryLegacyResult:
"""Executes the given query on the dataset identified by its id."""
if not opts.saveAsKind or (opts.saveAsKind == QueryKind.APL):
if not opts.save_as_kind or (opts.save_as_kind == QueryKind.APL):
raise WrongQueryKindException(
"invalid query kind %s: must be %s or %s"
% (opts.saveAsKind, QueryKind.ANALYTICS, QueryKind.STREAM)
% (opts.save_as_kind, QueryKind.ANALYTICS, QueryKind.STREAM)
)

path = "datasets/%s/query" % id
Expand All @@ -240,14 +253,18 @@ def query_legacy(
self.logger.debug(f"query result: {result}")
query_id = res.headers.get("X-Axiom-History-Query-Id")
self.logger.info(f"received query result with query_id: {query_id}")
result.savedQueryID = query_id
result.saved_query_id = query_id
return result

def apl_query(self, apl: str, opts: Optional[AplOptions] = None) -> QueryResult:
def apl_query(
self, apl: str, opts: Optional[AplOptions] = None
) -> LegacyQueryResult | TabularQueryResult:
"""Executes the given apl query on the dataset identified by its id."""
return self.query(apl, opts)

def query(self, apl: str, opts: Optional[AplOptions] = None) -> QueryResult:
def query(
self, apl: str, opts: Optional[AplOptions] = None
) -> LegacyQueryResult | TabularQueryResult:
"""Executes the given apl query on the dataset identified by its id."""
path = "datasets/_apl"
payload = ujson.dumps(
Expand All @@ -257,24 +274,97 @@ def query(self, apl: str, opts: Optional[AplOptions] = None) -> QueryResult:
self.logger.debug("sending query %s" % payload)
params = self._prepare_apl_options(opts)
res = self.session.post(path, data=payload, params=params)
result = Util.from_dict(QueryResult, res.json())
result = Util.from_dict(
(
TabularQueryResult
if opts.format == AplResultFormat.Tabular
else LegacyQueryResult
),
res.json(),
)
self.logger.debug(f"apl query result: {result}")
query_id = res.headers.get("X-Axiom-History-Query-Id")
self.logger.info(f"received query result with query_id: {query_id}")
result.savedQueryID = query_id
result.saved_query_id = query_id
return result

def df(
self,
dataset_name: str,
start_time: datetime,
end_time: datetime,
apl: str = "sort by _time asc",
):
"""
Query the dataset for a specified time range and return the result as a DataFrame.
Args:
dataset_name (str): Name of the dataset to query.
start_time (datetime): Start time of the query range.
end_time (datetime): End time of the query range.
apl (str, optional): APL query to execute. Defaults to '| sort by _time asc'. N.B. the apl *must*
include a | sort by _time asc for this to function correctly.
Returns:
pd.DataFrame: Result of the query as a pandas DataFrame.
"""
times = self._chunk_time_range(start_time, end_time)
apl = f"['{dataset_name}'] | {apl}"

def loop_query_until_finished(time_chunk: (datetime, datetime)):
opts = AplOptions(
no_cache=True,
save=False,
start_time=time_chunk[0],
end_time=time_chunk[1],
include_cursor=True,
format=AplResultFormat.Tabular,
)
result = self.query(apl, opts)
table = result.tables[0]
df = pd.DataFrame()
if len(table.columns) == 0 or len(table.columns[0]) == 0:
return df

while table.columns[0]:
new_df = pd.DataFrame([c for c in table.columns]).T
new_df.columns = [f.name for f in table.fields]
df = pd.concat([df, new_df], ignore_index=True)
if new_df.shape[0] != 1000:
break
opts.cursor = result.status.max_cursor
result = self.query(apl, opts)
table = result.tables[0]
df["_time"] = pd.to_datetime(df["_time"])
df["_sysTime"] = pd.to_datetime(df["_sysTime"])
return df

results = []
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = [
executor.submit(loop_query_until_finished, chunk) for chunk in times
]
for future in concurrent.futures.as_completed(futures):
try:
results.append(future.result())
except Exception as e:
for f in futures:
f.cancel()
raise e

return pd.concat(results)

def _prepare_query_options(self, opts: QueryOptions) -> Dict[str, Any]:
"""returns the query options as a Dict, handles any renaming for key fields."""
if opts is None:
return {}
params = {}
if opts.streamingDuration:
if opts.streaming_duration:
params["streaming-duration"] = (
opts.streamingDuration.seconds.__str__() + "s"
opts.streaming_duration.seconds.__str__() + "s"
)
if opts.saveAsKind:
params["saveAsKind"] = opts.saveAsKind.value
if opts.save_as_kind:
params["saveAsKind"] = opts.save_as_kind.value

params["nocache"] = opts.nocache.__str__()

Expand Down Expand Up @@ -324,8 +414,32 @@ def _prepare_apl_payload(

if opts is not None:
if opts.start_time:
params["startTime"] = opts.start_time
params["startTime"] = rfc3339.format(opts.start_time)
if opts.end_time:
params["endTime"] = opts.end_time

params["endTime"] = rfc3339.format(opts.end_time)
if opts.include_cursor:
params["includeCursor"] = opts.include_cursor
if opts.cursor:
params["cursor"] = opts.cursor
return params

def _chunk_time_range(self, start_time: datetime, end_time: datetime):
chunk_size = timedelta(minutes=1)
max_chunks = 100

total_chunks = int((end_time - start_time) / chunk_size)

if total_chunks > max_chunks:
chunk_size = timedelta(
minutes=(end_time - start_time).total_seconds() / max_chunks / 60
)
total_chunks = max_chunks

chunked_ranges = []

for i in range(total_chunks):
chunk_start = start_time + i * chunk_size
chunk_end = min(chunk_start + chunk_size, end_time)
chunked_ranges.append((chunk_start, chunk_end))

return chunked_ranges
1 change: 1 addition & 0 deletions axiom/datasets.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""This package provides dataset models and methods as well as a DatasetClient"""

import ujson
from logging import Logger
from requests import Session
Expand Down
1 change: 1 addition & 0 deletions axiom/logging.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
"""Logging contains the AxiomHandler and related methods to do with logging."""

import time
import atexit

Expand Down
2 changes: 1 addition & 1 deletion axiom/query/filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@


class FilterOperation(Enum):
"""A FilterOperatuib can be applied on queries to filter based on different conditions."""
"""A FilterOperation can be applied on queries to filter based on different conditions."""

EMPTY = ""
AND = "and"
Expand Down
6 changes: 3 additions & 3 deletions axiom/query/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@

@dataclass
class QueryOptions:
# StreamingDuration of a query.
streamingDuration: timedelta = field(default=None)
# streaming_duration of a query.
streaming_duration: timedelta = field(default=None)
# NoCache omits the query cache.
nocache: bool = field(default=False)
# SaveKind saves the query on the server with the given query kind. The ID
# of the saved query is returned with the query result as part of the
# response. `query.APL` is not a valid kind for this field.
saveAsKind: QueryKind = field(default=QueryKind.ANALYTICS)
save_as_kind: QueryKind = field(default=QueryKind.ANALYTICS)
2 changes: 1 addition & 1 deletion axiom/query/query.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from enum import Enum
from typing import List, Optional
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from datetime import datetime
from .aggregation import Aggregation
from .filter import Filter

Expand Down
Loading

0 comments on commit d0100b9

Please sign in to comment.