# Typed Objects

In addition to the interfaces offered by `ObjectAPIClient` and `DownloadedObject`, which provides access to Geoscience Objects in a way that is agnostic to the specific object type, the `evo-objects` package also provides a set of "typed objects" that represent specific object types.


As usual, we need to first authenticate, which we can do using the `ServiceManagerWidget` from the `evo-notebooks` package

In [None]:
from evo.notebooks import ServiceManagerWidget

manager = await ServiceManagerWidget.with_auth_code(
    client_id="your-client-id", cache_location="./notebook-data"
).login()

## Working with Block Models

Block models are stored in the Block Model Service, which is separate from the Geoscience Object Service. The `BlockModel` class provides a unified interface that handles both services - it creates the block model in the Block Model Service and automatically creates a corresponding Geoscience Object reference.

### Creating a Regular Block Model

To create a new block model, use `BlockModel.create_regular()` with a `RegularBlockModelData` object:


In [None]:
import numpy as np
import pandas as pd

from evo.blockmodels.typed import Units
from evo.objects.typed import BlockModel, Point3, RegularBlockModelData, Size3d, Size3i

# Create sample block model data
# Define the grid parameters
origin = (0, 0, 0)
n_blocks = (10, 10, 5)
block_size = (2.5, 5.0, 5.0)
total_blocks = n_blocks[0] * n_blocks[1] * n_blocks[2]

# Generate block centroid coordinates (x, y, z)
# Centroids are at the center of each block
centroids = []
for k in range(n_blocks[2]):
    for j in range(n_blocks[1]):
        for i in range(n_blocks[0]):
            # Calculate centroid position: origin + (index + 0.5) * block_size
            x = origin[0] + (i + 0.5) * block_size[0]
            y = origin[1] + (j + 0.5) * block_size[1]
            z = origin[2] + (k + 0.5) * block_size[2]
            centroids.append((x, y, z))

# Create DataFrame with x, y, z coordinates (more user-friendly than i, j, k)
block_data = pd.DataFrame(
    {
        "x": [c[0] for c in centroids],
        "y": [c[1] for c in centroids],
        "z": [c[2] for c in centroids],
        "grade": np.random.rand(total_blocks) * 10,  # Random grade values 0-10
        "density": np.random.rand(total_blocks) * 2 + 2,  # Random density 2-4
    }
)

# Create the block model using BlockModel.create_regular()
# Use the Units class for valid unit IDs
bm_data = RegularBlockModelData(
    name="Example Block Model",
    description="A sample block model created from the notebook",
    origin=Point3(x=origin[0], y=origin[1], z=origin[2]),
    n_blocks=Size3i(nx=n_blocks[0], ny=n_blocks[1], nz=n_blocks[2]),
    block_size=Size3d(dx=block_size[0], dy=block_size[1], dz=block_size[2]),
    cell_data=block_data,
    coordinate_reference_system="EPSG:28354",
    size_unit_id=Units.METRES,
    units={"grade": Units.GRAMS_PER_TONNE, "density": Units.TONNES_PER_CUBIC_METRE},
)

# Create the block model - this handles both Block Model Service and Geoscience Object creation
block_model = await BlockModel.create_regular(manager, bm_data)

print(f"Created block model: {block_model.name}")
print(f"Block Model UUID: {block_model.block_model_uuid}")
print(f"Geometry type: {block_model.geometry.model_type}")
print(f"Origin: {block_model.geometry.origin}")
print(f"N blocks: {block_model.geometry.n_blocks}")
print(f"Block size: {block_model.geometry.block_size}")
print(f"Attributes: {[attr.name for attr in block_model.attributes]}")

### Loading an Existing Block Model

You can load an existing `BlockModel` using `from_reference`, just like other typed objects:

In [None]:
# Load the block model we just created using its URL
loaded_bm = await BlockModel.from_reference(manager, block_model.metadata.url)

print(f"Loaded BlockModel: {loaded_bm.name}")
print(f"Block Model UUID: {loaded_bm.block_model_uuid}")
print(f"Bounding Box: {loaded_bm.bounding_box}")

### Accessing Block Model Data

The `BlockModel` provides methods to access the actual block data from the Block Model Service:

In [None]:
# Get all block data as a DataFrame
df = await block_model.to_dataframe(columns=["*"])
print(f"Retrieved {len(df)} blocks")
df.head(10)

In [None]:
# You can also query specific columns
grade_data = await block_model.to_dataframe(columns=["grade"])
print("Grade statistics:")
print(grade_data["grade"].describe())

### Adding New Attributes to a Block Model

You can add new attributes to the block model. This creates a new version in the Block Model Service:

In [None]:
# Calculate a new attribute based on existing data
# Get current data including coordinates
df = await block_model.to_dataframe(columns=["x", "y", "z", "grade", "density"])

# Create new attribute DataFrame with x, y, z coordinates
df_with_new_attr = pd.DataFrame(
    {
        "x": df["x"],
        "y": df["y"],
        "z": df["z"],
        "metal_content": df["grade"] * df["density"],  # grade * density
    }
)

# Add the new attribute to the block model
# Use Units class for valid unit IDs
new_version = await block_model.add_attribute(
    df_with_new_attr,
    attribute_name="metal_content",
    unit=Units.KILOS_PER_CUBIC_METRE,
)

print(f"Added attribute 'metal_content', new version: {new_version.version_id}")

In [None]:
new_version

### Updating Multiple Attributes

For more complex updates (adding multiple columns, updating existing columns, or deleting columns), use `update_attributes`:

In [None]:
# Create data with multiple new attributes using x, y, z coordinates
df_updates = pd.DataFrame(
    {
        "x": df["x"],
        "y": df["y"],
        "z": df["z"],
        "classification": np.where(df["grade"] > 5, "high", "low"),
        "value_index": df["grade"] * df["density"] * 100,
    }
)

# Add multiple new columns at once
version = await block_model.update_attributes(
    df_updates,
    new_columns=["classification", "value_index"],
)

print(f"Updated block model, new version: {version.version_id}")

# Verify the new attributes
updated_df = await block_model.to_dataframe(columns=["*"])
print(f"Columns now available: {list(updated_df.columns)}")

## Block Model Reports

Reports provide resource estimation summaries for block models. They calculate tonnages, grades, and metal content grouped by categories (e.g., geological domains).

**Requirements for reports:**
1. Columns must have units defined
2. At least one category column for grouping results
3. Density information (column or fixed value)

**Key classes:**
- `ReportColumnSpec` - Define which columns to report and how to aggregate them
- `ReportCategorySpec` - Define category columns for grouping
- `ReportSpecificationData` - The full report definition
- `MassUnits` - Helper with common mass unit IDs

### Add a Domain Column

First, let's add a category column for grouping. We'll create geological domains by slicing the block model into three zones based on elevation.

In [None]:
# Refresh to get latest data
block_model = await block_model.refresh()
df = await block_model.to_dataframe()

# Create domain column based on z-coordinate (elevation)
# Divide into 3 domains: LMS1 (lower), LMS2 (middle), LMS3 (upper)
z_min, z_max = df["z"].min(), df["z"].max()
z_range = z_max - z_min


def assign_domain(z):
    if z < z_min + z_range / 3:
        return "LMS1"  # Lower zone
    elif z < z_min + 2 * z_range / 3:
        return "LMS2"  # Middle zone
    else:
        return "LMS3"  # Upper zone


df["domain"] = df["z"].apply(assign_domain)

# Add the domain column to the block model
domain_data = df[["x", "y", "z", "domain"]]
version = await block_model.add_attribute(domain_data, "domain")
print(f"Added domain column. New version: {version.version_id}")

# Check domain distribution
print("\nDomain distribution:")
print(df["domain"].value_counts())

In [None]:
# Refresh to see the new attribute
block_model = await block_model.refresh()
block_model.attributes

### Create and Run a Report

Now we can create a report that calculates tonnages and grades by domain.

**Aggregation options (`Aggregation` enum):**
- `Aggregation.MASS_AVERAGE` - Mass-weighted average, use for **grades** (e.g., Au g/t)
- `Aggregation.SUM` - Sum of values, use for **metal content** (e.g., Au kg)

**Output unit options (`Units` class):**
- Grades: `Units.GRAMS_PER_TONNE`, `Units.PERCENT`, `Units.PPM`, `Units.OUNCES_PER_TONNE`
- Metal: `Units.KILOGRAMS`, `Units.TONNES`, `Units.GRAMS`, `Units.TROY_OUNCES`

**Mass unit options (`MassUnits` class):**
- `MassUnits.TONNES` - Metric tonnes ("t")
- `MassUnits.KILOGRAMS` - Kilograms ("kg")
- `MassUnits.OUNCES` - Troy ounces ("oz")

**Density options (choose ONE):**
- `density_column_name="density"` - Use a column (don't set `density_unit_id`)
- `density_value=2.7, density_unit_id="t/m3"` - Use fixed value (both required)

In [None]:
from evo.blockmodels.typed import Units
from evo.blockmodels.typed import (
    Aggregation,
    MassUnits,
    ReportCategorySpec,
    ReportColumnSpec,
    ReportSpecificationData,
)

# Define the report
report_data = ReportSpecificationData(
    name="Grade Resource Report",
    description="Resource estimate by domain",
    columns=[
        ReportColumnSpec(
            column_name="grade",
            aggregation=Aggregation.MASS_AVERAGE,  # Use MASS_AVERAGE for grades
            label="Grade",
            output_unit_id=Units.GRAMS_PER_TONNE,  # Use Units class for discoverability
        ),
        # You can add more columns:
        # ReportColumnSpec(column_name="metal", aggregation=Aggregation.SUM, label="Metal", output_unit_id=Units.KILOGRAMS),
    ],
    categories=[
        ReportCategorySpec(
            column_name="domain",
            label="Domain",
            values=["LMS1", "LMS2", "LMS3"],  # Optional: limit to specific values
        ),
    ],
    mass_unit_id=MassUnits.TONNES,  # Output mass in tonnes
    density_column_name="density",  # Use density column (unit comes from column)
    run_now=True,  # Run immediately
)

# Create the report
report = await block_model.create_report(report_data)
print(f"Created report: {report.name}")

In [None]:
# Pretty-print the report (shows BlockSync link)
report

### View Report Results

In [None]:
# Get the latest report result (waits if report is still running)
result = await report.refresh()

# Pretty-print the result (displays table in Jupyter)
result

In [None]:
# Get the BlockSync URL to view the report interactively
print(f"View report in BlockSync: {report.blocksync_url}")