# core

> Scan and query chronicle parquet files.

This package uses [polars](https://pola-rs.github.io/polars/py-polars/html/index.html) to read and query chronicle parquet files, and expose a simple API to query the resulting data.

In [None]:
#| default_exp core

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import polars as pl
# import pyarrow.parquet as pq
# import pyarrow.dataset as ds
# from s3fs import S3FileSystem
import pandas as pd
from fastcore.basics import patch
import re

## Scan chronicle parquet files

Chronicle collects and stores logs and metrics in a series of parquet files.

Use `scan_chronicle_logs()` to read logs and `scan_chronicle_logs()` to read metrics, by specifying the path to the parquet set you need.

The file tree looks like this, with `logs` and `metrics` in separate folders inside `v1`.

``` bash
.
└── v1/
    ├── logs/
    └── metrics/
```

Inside both `logs` and `metrics` the data is stored by date, separated by year, month and day.

``` bash
.
└── v1/
    ├── logs/
    │   └── 2023/
    │       ├── 02/
    │       │   ├── 01
    │       │   ├── 02
    │       │   ├── 03
    │       │   ├── 04
    │       │   ├── 05
    │       │   └── ...
    │       ├── 03
    │       ├── 04
    │       └── ...
    └── metrics/
        └── 2023/
            ├── 02/
            │   ├── 01
            │   ├── 02
            │   ├── 03
            │   ├── 04
            │   ├── 05
            │   └── ...
            ├── 03
            ├── 04
            └── ...
```

## Using the scan interface

In [None]:
#| export
def scan_chronicle(
        path: str, # Path to dataset,
        type: str = "", # must be `metrics` or `logs`
        date:str = None, # date in format `YYYY/MM/DD` 
        filename: str = None, # name of parquet file. If empty, will be inferred.
        version: str = "v1" # currently must be `v1`
    ) -> pl.LazyFrame:
    "Read a chronicle parquet file into a polars LazyFrame."
    if date == None:
        date = "*/*/*"
    else:
        date = re.sub("-", "/", date)
        dateh = re.sub("/", "-", date)
    if filename == None:
        # filename = f"{type}-{dateh}.parquet"
        filename = "*.parquet"
    path = f"{path}/{version}/{type}/{date}/{filename}"
    # return path
    return pl.scan_parquet(path)



In [None]:

scan_chronicle("./data", "metrics", "2023/04/03")


In [None]:
#| export
def scan_chronicle_metrics(
        path: str, # Path to dataset,
        date:str = None, # date in format `YYYY/MM/DD` 
        version: str = "v1" # currently must be `v1`
) -> pl.DataFrame:
    "Read a chronicle metrics parquet file into a polars dataframe."
    return scan_chronicle(path, "metrics", date, version = version) 

def scan_chronicle_logs(
        path: None, # Path to dataset,
        date:str = None, # date in format `YYYY/MM/DD` 
        version: str = "v1" # currently must be `v1`
) -> pl.DataFrame:
    "Read a chronicle logs parquet file into a polars dataframe."
    return scan_chronicle(path, "logs", date, version = version) 


In [None]:
z = scan_chronicle_metrics("./data", "2023/04/03")
assert type(z) == pl.LazyFrame
assert z.collect().columns == [
    'service',
    'host',
    'os',
    'attributes',
    'name',
    'description',
    'unit',
    'type',
    'timestamp',
    'value_float',
    'value_int',
    'value_uint',
    'value_column'
]

In [None]:

z = scan_chronicle_logs("./data", "2023/04/03")
assert type(z) == pl.LazyFrame
assert z.collect().columns == [
    'service', 
    'host', 
    'os', 
    'attributes', 
    'body', 
    'timestamp'
]

## Analyse metrics

In [None]:
#| export
@pl.api.register_lazyframe_namespace("metrics")
class ChronicleMetrics:
    def __init__(self, 
                 ldf: pl.LazyFrame # A `polars` DataFrame
                 ) -> pl.LazyFrame:
        "Initialise a chronicle metrics class"
        self._ldf = ldf



Use `.metrics.describe()` to get a DataFrame of the unique metrics in the metrics data, containing the `service`, `name` and `description` of each metric.

In [None]:
#| export
@patch
def describe(self: ChronicleMetrics) -> pd.DataFrame:
    "Reads metrics dataframe and returns a pandas dataframe with summary of service, name and description of all metrics"
    return (
        self._ldf
        .groupby("service", "name")
        .agg(
            pl.col("description").unique(),
            pl.col("value_column").unique(),
        )
        .with_columns(
            pl.col("description").list.join(", "),
            pl.col("value_column").list.join("")
        )
        .sort("service", "name")
        .collect()
        .to_pandas()
    )


In [None]:

m = scan_chronicle_metrics("./data", "2023/04/03").metrics.describe()
assert list(m) == ['service', 'name', 'description', 'value_column']
m

Use `.metrics.filter()` to filter the DataFrame on the `name` column.

In [None]:
#| export
@patch
def filter(self: ChronicleMetrics, 
        name:str, # name of metric to extract
        service:str = None, # service to extract metric from
        alias:str = None # alias to use for new column
    ) -> pd.DataFrame:
    "Extract a single metric from a metrics dataframe"
    if alias == None:
        alias = name
    
    df = (
        self._ldf
        .filter(
            pl.col("name") == name
        )
    )
    
    if service != None:
        df = df.filter(pl.col("service") == service) 
    
    return (
        df
        .sort(pl.col("host"), pl.col("timestamp"))
        .select([
            "host",
            pl.col("timestamp"),
            pl.col("value_float").alias(alias)
        ])
        .collect()
        .to_pandas()
    )


In [None]:
m = scan_chronicle_metrics("./data", "2023/04/03").metrics.filter("rsconnect_system_memory_used")
assert type(m) == pd.DataFrame
assert list(m) == ['host', 'timestamp', 'rsconnect_system_memory_used']

m = scan_chronicle_metrics("./data", "2023/04/03").metrics.filter("rsconnect_system_memory_used", alias="memory")
assert type(m) == pd.DataFrame
assert list(m) == ['host', 'timestamp', 'memory']

m = scan_chronicle_metrics("./data", "2023/04/03").metrics.filter("rsconnect_system_memory_used", service = "connect-metrics", alias = "memory")
assert type(m) == pd.DataFrame
assert list(m) == ['host', 'timestamp', 'memory']


## Analyse logs

In [None]:

#| export
@pl.api.register_lazyframe_namespace("logs")
class ChronicleLogs:
    def __init__(self, 
                 df: pl.DataFrame # A polars data frame
                 ) -> pl.DataFrame:
        "Initialise a chronicle logs DataFrame"
        self._ldf = df

### Filter logs on type

You can use `logs/filter_type()` to filter logs on the `type` column.

In [None]:

scan_chronicle_logs("./data").head(1).explode("attributes").collect()

In [None]:
#| export
@patch
def filter_type(self: ChronicleLogs,
                value: str # Value to extract 
    ) -> pd.DataFrame:
    "Extract all logs where type == value"
    return (
        self._ldf
        .with_columns([
            # (pl.col("body").str.json_path_match(f"$.{value}").alias(f".{value}")),
            (pl.col("body").str.json_path_match("$.type").alias(".type"))
        ])
        .filter(pl.col(".type") == f"{value}")
        # .select(["service", "host", "timestamp", f".{value}", ".type", "body"])
        # .sort("service", "host", f".{value}", "timestamp")
        .collect()
    )

In [None]:
# scan_chronicle_logs("./data").logs.filter_type("auth_login")
logs = scan_chronicle_logs("./data").logs.filter_type("username")
assert type(logs) == pl.DataFrame

### Unique Connect actions

In [None]:

#| export
@patch
def unique_connect_actions(self: ChronicleLogs,
    ) -> pd.DataFrame:
    "Extract a sample of unique connect actions"
    return (
        self._ldf
        .filter(pl.col("service") == "connect")
        .with_columns([
                pl.col("body").str.json_path_match("$.msg").alias("message"),
                pl.col("body").str.json_path_match("$.actor_description").alias("username"),
                pl.col("body").str.json_path_match("$.action").alias("action"),
        ])
        .unique("action")
        .select("service", "action", "attributes", "body")
        .collect()
        .to_pandas()
    )


In [None]:
scan_chronicle_logs("./data").logs.unique_connect_actions()
# assert type(logs) == pl.DataFrame

### Connect logins

In [None]:

#| export
@patch
def connect_logins(
    self: ChronicleLogs,
    ) -> pl.DataFrame:
    "Extract Connect login logs"
    return (
        self._ldf
        .with_columns([
            pl.col("body").str.json_path_match("$.type").alias("type"),
            pl.col("body").str.json_path_match("$.action").alias("action"),
            pl.col("body").str.json_path_match("$.actor_description").alias("username"),
        ])
        .filter(
            (pl.col("service") == "connect") &
            (pl.col("type") == "audit") &
            (pl.col("action") == "user_login")
        )
        .select("host", "timestamp", "username", "action", "type")
        .collect()
    )


In [None]:

path = "./data"
scan_chronicle_logs(path).logs.connect_logins()

### Extract connect audit logs

In [None]:
#| export
@patch
def extract_connect_audit_logs(
    self: ChronicleLogs,
    type: str,
    ) -> pl.DataFrame:
    "Extract Connect audit logs"
    return (
        self._ldf
        .with_columns([
            pl.col("body").str.json_path_match("$.type").alias("type"),
            pl.col("body").str.json_path_match("$.action").alias("action"),
            pl.col("body").str.json_path_match("$.actor_description").alias("username"),
            pl.col("body").str.json_path_match("$.actor_guid").alias("guid"),
            pl.col("body").str.json_path_match("$.msg").alias("msg"),
        ])
        .filter(
            (pl.col("service") == "connect") &
            (pl.col("action") == type)
        )
        .select("host", "timestamp", "username", "action", "type", "guid", "msg", "attributes")
        .collect()
    )




In [None]:

path = "./data"
scan_chronicle_logs(path).logs.extract_connect_audit_logs("user_login")

### Unique workbench types

In [None]:

#| export
@patch
def unique_workbench_types(self: ChronicleLogs,
    ) -> pd.DataFrame:
    "Extract a sample of unique workbench types"
    return (
        self._ldf
        .filter(pl.col("service") == "workbench")
        .with_columns([
                pl.col("body").str.json_path_match("$.type").alias("type"),
                pl.col("body").str.json_path_match("$.username").alias("username"),
        ])
        .unique("type")
        .select("service", "type", "attributes", "body")
        .collect()
        .to_pandas()
    )


In [None]:
scan_chronicle_logs("./data").logs.unique_workbench_types()
# assert type(logs) == pl.DataFrame

### Workbench logins

In [None]:
#| export
@patch
def workbench_logins(
    self: ChronicleLogs,
    ) -> pl.DataFrame:
    "Extract Workbench login logs"
    return (
        self._ldf
        .with_columns([
            pl.col("body").str.json_path_match("$.type").alias("type"),
            pl.col("body").str.json_path_match("$.action").alias("action"),
            pl.col("body").str.json_path_match("$.username").alias("username"),
        ])
        .filter(
            (pl.col("service") == "workbench") &
            (pl.col("type") == "auth_login")
        )
        .select("host", "timestamp", "username", "action", "type")
        .collect()
    )


In [None]:

path = "./data"
scan_chronicle_logs(path).logs.workbench_logins()

### Extract workbench audit logs

In [None]:
#| export
@patch
def extract_workbench_audit_logs(
    self: ChronicleLogs,
    type: str,
    ) -> pl.DataFrame:
    "Extract Workbench login logs"
    return (
        self._ldf
        .with_columns([
            pl.col("body").str.json_path_match("$.type").alias("type"),
            pl.col("body").str.json_path_match("$.action").alias("action"),
            pl.col("body").str.json_path_match("$.username").alias("username"),
        ])
        .filter(
            (pl.col("service") == "workbench") &
            (pl.col("type") == type)
        )
        .select("host", "timestamp", "username", "action", "type", "attributes")
        .collect()
    )



In [None]:

path = "./data"
scan_chronicle_logs(path).logs.extract_workbench_audit_logs("session_start")

In [None]:
#| export
@patch
def extract_workbench_audit_cols(
    self: ChronicleLogs,
    type: str,
    ) -> pl.DataFrame:
    "Extract Workbench audit columns"
    return (
        self._ldf
        .with_columns([
            pl.col("body").str.json_path_match("$.type").alias("type"),
            pl.col("body").str.json_path_match("$.action").alias("action"),
            pl.col("body").str.json_path_match("$.username").alias("username"),
        ])
        .filter(
            (pl.col("service") == "workbench") &
            (pl.col("type") == type)
        )
        .select("host", "timestamp", "username", "action", "type", "attributes")
        # .head(1)
        # .explode("attributes").unnest("attributes") #.drop("value")
        # .with_columns([
        #     # pl.col("attributes").arr.to_struct().apply(lambda x: x[1])
        #     pl.col("attributes").apply(lambda x: x.sort())
        #     # pl.col("attributes").arr.to_struct().to_dict()
        #     # pl.col("attributes").to_dict()
        # ])
        # .select("attributes")
        .collect()
        # .to_struct("attributes")
        # .explode("attributes")
        # .unnest()
        # .to_dicts()
        # .to_pandas()
    )



In [None]:
scan_chronicle_logs("./data").logs.extract_workbench_audit_cols("session_quit")

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()