Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions analyzer_interface/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,19 @@
AnalyzerInput,
AnalyzerInterface,
AnalyzerOutput,
AnalyzerParam,
DataType,
InputColumn,
OutputColumn,
SecondaryAnalyzerInterface,
WebPresenterInterface,
backfill_param_values,
)
from .params import (
IntegerParam,
ParamType,
ParamValue,
TimeBinningParam,
TimeBinningValue,
)
from .suite import AnalyzerSuite
17 changes: 17 additions & 0 deletions analyzer_interface/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from pydantic import BaseModel

from .interface import SecondaryAnalyzerInterface
from .params import ParamValue


class PrimaryAnalyzerContext(ABC, BaseModel):
Expand All @@ -25,6 +26,14 @@ def input(self) -> "InputTableReader":
"""
pass

@property
@abstractmethod
def params(self) -> dict[str, ParamValue]:
"""
Gets the analysis parameters.
"""
pass

@abstractmethod
def output(self, output_id: str) -> "TableWriter":
"""
Expand All @@ -44,6 +53,14 @@ class BaseDerivedModuleContext(ABC, BaseModel):
during its lifetime. This directory will not persist between runs.
"""

@property
@abstractmethod
def base_params(self) -> dict[str, ParamValue]:
"""
Gets the primary analysis parameters.
"""
pass

@property
@abstractmethod
def base(self) -> "AssetsReader":
Expand Down
12 changes: 10 additions & 2 deletions analyzer_interface/declaration.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,23 @@
SecondaryAnalyzerInterface,
WebPresenterInterface,
)
from .params import ParamValue


class AnalyzerDeclaration(AnalyzerInterface):
entry_point: Callable[[PrimaryAnalyzerContext], None]
default_params: Callable[[PrimaryAnalyzerContext], dict[str, ParamValue]]
is_distributed: bool

def __init__(
self,
interface: AnalyzerInterface,
main: Callable,
*,
is_distributed: bool = False
is_distributed: bool = False,
default_params: Callable[[PrimaryAnalyzerContext], dict[str, ParamValue]] = (
lambda _: dict()
)
):
"""Creates a primary analyzer declaration

Expand All @@ -39,7 +44,10 @@ def __init__(
executable.
"""
super().__init__(
**interface.model_dump(), entry_point=main, is_distributed=is_distributed
**interface.model_dump(),
entry_point=main,
default_params=default_params,
is_distributed=is_distributed
)


Expand Down
59 changes: 59 additions & 0 deletions analyzer_interface/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import polars as pl
from pydantic import BaseModel

from .params import ParamType, ParamValue


class BaseAnalyzerInterface(BaseModel):
id: str
Expand Down Expand Up @@ -37,6 +39,49 @@ class AnalyzerInput(BaseModel):
columns: list["InputColumn"]


class AnalyzerParam(BaseModel):
id: str
"""
The name of the parameter. This becomes the key in the parameters dictionary
that is passed to the analyzer.
"""

human_readable_name: Optional[str] = None
"""
The human-friendly name for the parameter. This is used in the UI to
represent the parameter.
"""

description: Optional[str] = None
"""
A short description of the parameter. This is used in the UI to represent
the parameter.
"""

type: ParamType
"""
The type of the parameter. This is used for validation and for customizing
the UX for parameter input.
"""

default: Optional[ParamValue] = None
"""
Optional: define a static default value for this parameter. A parameter
without a default will need to be chosen explicitly by the user.
"""

backfill_value: Optional[ParamValue] = None
"""
Recommended if this is a parameter that is newly introduced in a previously
released analyzer. The backfill is show what this parameter was before it
became customizable.
"""

@property
def print_name(self):
return self.human_readable_name or self.id


class AnalyzerOutput(BaseModel):
id: str
"""
Expand Down Expand Up @@ -78,6 +123,11 @@ class AnalyzerInterface(BaseAnalyzerInterface):
input: AnalyzerInput
"""
Specifies the input data schema for the analyzer.
"""

params: list[AnalyzerParam] = []
"""
A list of parameters that the analyzer accepts.
"""

outputs: list["AnalyzerOutput"]
Expand Down Expand Up @@ -159,3 +209,12 @@ class InputColumn(Column):

class OutputColumn(Column):
pass


def backfill_param_values(
param_values: dict[str, ParamValue], analyzer_spec: AnalyzerInterface
) -> dict[str, ParamValue]:
return {
param_spec.id: param_values.get(param_spec.id) or param_spec.backfill_value
for param_spec in analyzer_spec.params
}
92 changes: 92 additions & 0 deletions analyzer_interface/params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from typing import Literal, Union

from pydantic import BaseModel, ConfigDict

TimeBinningUnit = Literal[
"year",
"month",
"week",
"day",
"hour",
"minute",
"second",
]


class IntegerParam(BaseModel):
"""
Represents an integer value

The corresponding value will be of type `int`.
"""

type: Literal["integer"] = "integer"
min: int
max: int


class TimeBinningParam(BaseModel):
"""
Represents a time bin.

The corresponding value will be of type `TimeBinningValue`.
"""

type: Literal["time_binning"] = "time_binning"


class TimeBinningValue(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)

unit: TimeBinningUnit
amount: int

def to_polars_truncate_spec(self) -> str:
"""
Converts the value to a string that can be used in Polars truncate spec.
See https://docs.pola.rs/api/python/stable/reference/expressions/api/polars.Expr.dt.truncate.html
"""
amount = self.amount
unit = self.unit
if unit == "year":
return f"{amount}y"
if unit == "month":
return f"{amount}mo"
if unit == "week":
return f"{amount}w"
if unit == "day":
return f"{amount}d"
if unit == "hour":
return f"{amount}h"
if unit == "minute":
return f"{amount}m"
if unit == "second":
return f"{amount}s"

raise ValueError("Invalid time binning value")

def to_human_readable_text(self) -> str:
amount = self.amount
unit = self.unit

if unit == "year":
return f"{amount} year{'s' if amount > 1 else ''}"
if unit == "month":
return f"{amount} month{'s' if amount > 1 else ''}"
if unit == "week":
return f"{amount} week{'s' if amount > 1 else ''}"
if unit == "day":
return f"{amount} day{'s' if amount > 1 else ''}"
if unit == "hour":
return f"{amount} hour{'s' if amount > 1 else ''}"
if unit == "minute":
return f"{amount} minute{'s' if amount > 1 else ''}"
if unit == "second":
return f"{amount} second{'s' if amount > 1 else ''}"

raise ValueError("Invalid time binning value")


ParamType = Union[TimeBinningParam, IntegerParam]

ParamValue = Union[TimeBinningValue, int]
8 changes: 8 additions & 0 deletions analyzers/example/example_base/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from analyzer_interface import AnalyzerDeclaration

from .default_params import default_params
from .interface import interface
from .main import main

Expand All @@ -8,6 +9,13 @@
example_base = AnalyzerDeclaration(
interface=interface,
main=main,
# Optional.
# If your analyzer is parameterized, you can define a function to suggest
# default analysis parameters based on the input data. For an easier way
# to provide default parameter values independently of input data, simply
# specify it in the analysis parameter specification in the interface itself
# (see `interface.py`)
default_params=default_params,
# This marks the analyzer as distributed or not. A distributed
# analyzer is visible only when the application is packaged. A non-distributed
# analyzer is also visible when the application is run in development mode.
Expand Down
12 changes: 12 additions & 0 deletions analyzers/example/example_base/default_params.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from analyzer_interface import ParamValue
from analyzer_interface.context import PrimaryAnalyzerContext


def default_params(context: PrimaryAnalyzerContext) -> dict[str, ParamValue]:
# Like the `main.py` analyzer entry point, the default parameter function
# can use the context object to access the input data. It can then use it to
# suggest parameters in a data-dependent manner.
#
# Data-dependent defaults override static defaults. Importantly, if you pass
# `None` here, it will *unset* a static default.
return {}
41 changes: 41 additions & 0 deletions analyzers/example/example_base/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
AnalyzerInput,
AnalyzerInterface,
AnalyzerOutput,
AnalyzerParam,
InputColumn,
IntegerParam,
OutputColumn,
)

Expand Down Expand Up @@ -66,6 +68,45 @@
),
]
),
params=[
AnalyzerParam(
# This corresponds to the key in the parameter dictionary when accessed
# from the analyzer context.
id="fudge_factor",
# This is the human readable name that will be displayed in the user
# interface. It's optional and will fall back to the ID.
human_readable_name="Character Count Fudge Factor",
# Also optional, shown in the UI if provided.
description="""
Adds to the character count, because data manipulation is
good data science and ethically no-problemo.

/s: In seriousness, please *don't* manipulate data.
It's wrong. We are trying to fight misinformation :)
""",
# Follow the definition here to the module where all the supported
# param types are defined.
type=IntegerParam(
# Depending on the parameter type, you may be required to
# provide extra ranges/bounds for validation.
min=-1000,
max=1000,
),
# Optional.
# Sets the default/initial value. This is one of the two ways to set
# a default parameter value, the other being using a default_params
# function (see the `__init__.py` file), which can suggest defaults
# in a data-dependent manner.
default=0,
# Optional.
# If you have an existing analyzer that previously hardcoded a parameter
# that you now want to customize, you can set this to the value it was
# hardcoded, and older analysis saves will use this value when the
# parameter is shown in the UI and accessed in the web presenter.
# Note: This is NOT a default value!
backfill_value=0,
)
],
outputs=[
AnalyzerOutput(
# This should be locally unique to the analyzer.
Expand Down
14 changes: 13 additions & 1 deletion analyzers/example/example_base/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ def main(context: PrimaryAnalyzerContext):
input_reader = context.input()
df_input = input_reader.preprocess(pl.read_parquet(input_reader.parquet_path))

# The analysis parameters are provided in a dictionary. This is how you
# access them. The dictionary key must match the parameter ID in the interface.
fudge_factor_param = context.params.get("fudge_factor")

# You don't actually need to do this as the app will make sure all
# parameters are of the right type. But it's a good idea anyway.
# It also lets python narrow the type for you.
assert isinstance(fudge_factor_param, int), "Fudge factor must be an integer"

# Now you can start your analysis. The following code is just a minimal example.
#
# The use of the ProgressReporter is optional. It helps breaking a
Expand All @@ -28,7 +37,10 @@ def main(context: PrimaryAnalyzerContext):
df_count = df_input.select(
pl.col("message_id"),
# The input and output columns are as you define in the interface.
pl.col("message_text").str.len_chars().alias("character_count"),
pl.col("message_text")
.str.len_chars()
.add(fudge_factor_param)
.alias("character_count"),
)

# If you decide to process the data in small batches
Expand Down
3 changes: 3 additions & 0 deletions analyzers/example/example_report/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ def main(context: SecondaryAnalyzerContext):
.alias("is_long")
)

# This will have been provided or backfilled by the app.
assert context.base_params.get("fudge_factor") is not None

# Save the output to a parquet file. The output ID comes from the secondary
# analyzer's interface.
df_export.write_parquet(context.output("example_report").parquet_path)
Loading