## Collect Parameters
First we need to gather all of the configuration parameters that went into the initial processing so that we can begin to organize them into usable objects and define reasonable defaults. The following cells summarize the parameters used in each step.

```python
# Create an AOI geometry
from shapely import Point
import geopandas

point_of_interest = Point(-90.47416614755436, 47.738145812431185)  # EPSG:4326
input_gdf: geopandas.GeoDataFrame = geopandas.GeoDataFrame(
    geometry=[point_of_interest], crs="EPSG:4326"
)
```

In [1]:
from geopandas import GeoDataFrame, GeoSeries

input_geometry: GeoDataFrame | GeoSeries

```python
# Select intersecting tile(s)
from pathlib import Path


tiles_kwargs = {
    "filename": Path("../data/interim/tile_index.gpkg").resolve(),
    "layer": "tile_index",
    "mask": input_gdf.to_crs("EPSG:6344"),
    "where": "workunit='MN_RainyLake_1_2020'",
}
```

In [2]:
from pathlib import Path
from typing import Any


tile_index_source: Path  # .gpkg or .parquet created from build_tile_index.py
tile_index_read_kwargs: dict[str, Any]  # Might not be necessary; just use an intersect

```python
# Copied from ../config/mn_rainylake_1_2020.json
ept_json_href = (
    "https://s3-us-west-2.amazonaws.com/usgs-lidar-public/MN_RainyLake_1_2020/ept.json"
)
enriched_tiles = tiles.assign(ept_json_href=ept_json_href)
```

In [3]:
from pydantic import HttpUrl


# Either add this field to the tile layer or create a separate file/layer to read this from
ept_json_href: HttpUrl

```python
# Create buffered tile WKT
buffer_dist = 5  # meters
pipeline_params["buffered_wkt"] = (
    pipeline_params.buffer(distance=buffer_dist).to_crs("EPSG:3857").to_wkt()
)
```

In [4]:
buffer_n_cells: int = 3  # Multiply by the resolution to get the buffer distance
ept_crs: str  # Read from ept.json

```python
# Assign the EPSG string of the tiles to a field
pipeline_params["out_srs"] = pipeline_params.crs.srs
```

In [5]:
# The tile bounds are used later in the pipeline to define the raster extents,
# so the reprojection crs should always be the same as the tiles.

out_srs: str  # Calculate from tiles

```python
# Assign fields for the filters.faceraster parameters
resolution = 0.30  # meters

pipeline_params["resolution"] = resolution
pipeline_params["width"] = (
    (
        pipeline_params.geometry.bounds["maxx"]
        - pipeline_params.geometry.bounds["minx"]
        + resolution
    )
    / resolution
).astype(int)
pipeline_params["height"] = (
    (
        pipeline_params.geometry.bounds["maxy"]
        - pipeline_params.geometry.bounds["miny"]
        + resolution
    )
    / resolution
).astype(int)
pipeline_params["origin_x"] = pipeline_params.geometry.bounds["minx"]
pipeline_params["origin_y"] = pipeline_params.geometry.bounds["miny"]
```

In [6]:
resolution: float
width: int  # Calculate from tile and resolution
height: int  # Calculate from tile and resolution
origin_x: float  # Calculate from tile
origin_y: float  # Calculate from tile

```python
# Assign the output_file field
output_dir = Path("../data/processed").resolve()
pipeline_params["output_file"] = "../data/processed/" + pipeline_params["name"] + "_1ft.tif"
```

In [7]:
output_dir: Path
output_prefix: str | None
output_postfix: str | None

## User defined parameters
The following cell restates the variables above that need to be set by the user

In [8]:
# User defined parameters
input_geometry: GeoDataFrame | GeoSeries
tile_index_source: Path  # .gpkg or .parquet created from build_tile_index.py
buffer_n_cells: int = 3  # Multiply by the resolution to get the buffer distance
resolution: float
output_dir: Path
output_prefix: str | None
output_postfix: str | None

In [9]:
from pydantic import BaseModel
from typing import Optional


class PDALReader(BaseModel):
    pass


class PDALFilter(BaseModel):
    pass


class PDALWriter(BaseModel):
    pass


class FaceRasterWriter(PDALWriter):
    tag: str = "face_raster_writer"
    type: str = "writers.raster"
    gdaldriver: str = "GTiff"
    gdalopts: str = "COMPRESS=DEFLATE"
    data_type: str = "float32"
    nodata: int | float = -999999

    def stage_dict(self, filename: str):
        d = self.dict(exclude_none=True)
        d.update(filename=filename)
        return d


class EptPipeline(BaseModel):
    tile_index_source: Path
    resolution: float
    buffer_n_cells: int = 3
    pdal_writers: list[PDALWriter]


In [10]:
faceraster = FaceRasterWriter()
# faceraster.dict(exclude_none=True)
faceraster.stage_dict("123.tif")

{'tag': 'face_raster_writer',
 'type': 'writers.raster',
 'gdaldriver': 'GTiff',
 'gdalopts': 'COMPRESS=DEFLATE',
 'data_type': 'float32',
 'nodata': -999999,
 'filename': '123.tif'}

## Pipeline construction classes

In [32]:
from __future__ import annotations
from dataclasses import dataclass
from typing import Protocol
from shapely import Polygon
import pdal
from geopandas import GeoDataFrame


StageDict = dict[str, str | int | float]


@dataclass
class PDALPipelineFactory:
    point_source: PDALPointSource
    products: list[PDALProduct]

    def __call__(self, tile: Tile) -> pdal.Pipeline:
        ...


@dataclass
class Tile:
    name: str
    epsg_code: int
    ept_json_url: str
    geom: Polygon

    @staticmethod
    def from_gdf(tiles_gdf: GeoDataFrame) -> list[Tile]:
        ...

    @property
    def origin_x(self) -> float:
        ...

    @property
    def origin_y(self) -> float:
        ...

    def width(self, resolution: float) -> int:
        ...

    def height(self, resolution: float) -> int:
        ...

    def to_wkt(self) -> str:
        ...

    def buffer(self, dist: float) -> Tile:
        ...

    def buffered_ept_filter_as_wkt(self, dist: float) -> str:
        ...


class PDALPointSource(Protocol):

    def get_point_source_stages(self, tile: Tile, buffer_dist: float) -> list[StageDict]:
        ...


class PDALProduct(Protocol):

    def get_product_stages(self, tile: Tile) -> list[StageDict]:
        ...

In [14]:
@dataclass
class VendorClassifiedGroundPoints:
    buffer_ept_filter_dist: float
    tag: str = "vendor_classified_ground_points"

    def get_point_source_stages(self, tile: Tile) -> list[StageDict]:
        return [
            {
                "tag": "read_data",
                "type": "readers.ept",
                "filename": tile.ept_json_url,
                "polygon": tile.buffered_ept_filter_as_wkt(self.buffer_ept_filter_dist),
            },
            {
                "tag": "ground_only",
                "type": "filters.range",
                "limits": "Classification[2:2]",
            },
            {
                "tag": self.tag,
                "type": "filters.reprojection",
                "out_srs": f"EPSG:{tile.epsg_code}",
            },
        ]

In [17]:
from typing import Optional


@dataclass
class DEMFromDelauneyMesh:
    input_tag: str
    resolution: float
    output_dir: Path
    output_prefix: Optional[str]
    output_postfix: Optional[str]
    output_ext: str = ".tif"
    gdaldriver: str = "GTiff"
    gdalopts: str = "COMPRESS=DEFLATE"
    data_type: str = "float32"
    nodata: int | float = -999999

    def get_pipeline_stages(self, tile: Tile):
        return [
            {
                "tag": "faceraster",
                "type": "filters.faceraster",
                "inputs": [self.input_tag],
                "resolution": self.resolution,
                "width": tile.width(self.resolution),
                "height": tile.height(self.resolution),
                "origin_x": tile.origin_x,
                "origin_y": tile.origin_y,
            },
            {
                "tag": "write_raster",
                "type": "writers.raster",
                "filename": str(self.output_dir),
                "gdaldriver": self.gdaldriver,
                "gdalopts": self.gdalopts,
                "data_type": self.data_type,
                "nodata": self.nodata,
            },
        ]


## Example usage

In [31]:
# Define point source
point_source = VendorClassifiedGroundPoints(
    buffer_ept_filter_dist=10  # meters
)

# Define pipeline product
dem = DEMFromDelauneyMesh(
    input_tag="vendor_classified_ground_points",
    resolution=0.5,  # meters
    output_dir=Path("../data/interim/").resolve(),
    output_prefix="experiment01_",
    output_postfix="_half_meter_dem",
)

# Select tiles
tiles_gdf = GeoDataFrame({
    "name": ["dummy_tile"],
    "epsg_code": [1234],
    "ept_json_url": ["http://example.com"],
    "geometry": [Polygon(((0., 0.), (0., 1.), (1., 1.), (1., 0.), (0., 0.)))]
})
tiles = Tile.from_gdf(tiles_gdf)

# Generate pipelines
pipeline_factory = PDALPipelineFactory(
    point_source=point_source,
    products=[dem]
)
pipelines = [pipeline_factory(tile) for tile in tiles]

TypeError: 'NoneType' object is not iterable