Skip to content

Batch Processing with Dataflow

Joshua Essex edited this page Oct 20, 2020 · 4 revisions

Our system relies on being able to calculate a diverse, evolving set of metrics from a robust dataset of individual-level records. The set of metrics we need to calculate will vary from region to region, from stakeholder to stakeholder, even from quarter to quarter or year to year. The granularity of the metrics will need to vary along those same lines. The ways in which we deliver those metrics will also vary along these lines.

A parole officer working for a department of corrections will need a different set of metrics with a different level of granularity delivered through a different medium than a researcher performing broad analysis into the effect of drug criminalization on incarceration and recidivism levels. But they both need metrics derived from the same core dataset, composed of streams from corrections, law enforcement, courts, supervision boards, and more. And if their numbers are based on the same underlying dataset and methodological patterns, they are capable of rationally exchanging and comparing insights.

Batch Processing

Some metrics can be derived through simple query logic via our BigQuery views, such as charge distributions broken down by classification or admission counts broken down by admission reason. Views can be combined in an arbitrary number of layers to roll up various aggregates into more sophisticated metrics. But some metrics cannot be determined via SQL without significant pains, typically because they require looking at the context and events of a person's full history of interaction with the justice system. For those metrics, we use batch processing via Apache Beam/Cloud Dataflow.

This design provides a calculation pipeline which will enable flexible ongoing metric calculation without high development costs and pluggable options for metric persistence to service evolving use cases.

Beam/Dataflow

The dataflow pipelines are orchestrated through Cloud Composer. Cloud Composer is a fully managed GCP service for Apache Airflow that orchestrates operations through directed acyclic graphs (DAGs). The pipelines are triggered by a Pub / Sub topic and are run simultaneously on Dataflow. Once a particular state's pipelines are finished running, then the BigQuery to GCS export is triggered for the data associated with each state.

Using Cloud Composer allows the pipelines to all run at the same time, which will be beneficial as more states are onboarded and more data has to be processed. Here is the diagram of the state specific DAG for airflow:

IO

Input

To ensure large-scale batch processing doesn't impact production ingest operations, our batch calculation pipelines read from and write back to our data warehouse, BigQuery. The input and output datasets are separate: the input dataset is the regular full export of a particular schema, such as the state schema dataset. Pipelines have some query that reads result sets from a SQL query against the schema.

The recidiviz/calculator/utils/extractor_utils.py class contains logic and transforms for pipelines to run SQL queries against Dataflow and compose the result sets into fully hydrated application-layer entity graphs to be processed by later steps.

Output

Pipelines write metrics to an output dataset specifically for Dataflow metrics, which contains tables for specific classes of metrics. One pipeline may write multiple metric classes to multiple tables if those different metrics are computed from the same general body of logic. For example, the recidivism calculation pipeline writes recidivism counts to one table, recidivism rates to another table, and average time at liberty prior to reincarceration to a third table, all within a single dataset.

There are different classes of metrics, depending on the specific kinds of analysis we seek to perform. For example, "recidivism" is too broad because there are a variety of different measurements for tracking "recidivism." However, "recidivism rate broken down by race" is too specific because breakdowns are just different levels of detail for a single measurement. "Recidivism rate" is the right level of abstraction, with different optional columns for specifying different breakdowns.

Metric classes

Each metric class should have its own table in BigQuery, with columns to match the following general structure:

  • Numerical values of the analysis, e.g. recidivism rate
  • Jurisdiction code indicating which jurisdiction this metric measures, e.g. state code
  • A “key” defining the parameters of the calculation:
    • Required normalization fields, e.g. a methodology flag or a release cohort label
    • Optional dimensional fields that outline the dimensions included in a particular calculation, e.g. race, sex, facility of incarceration
      • The dimensional fields in a given metric object represent the full range of possible dimensions supported for that type of metric, and the model will expand as new dimensionality is enabled.
      • Values of null in a particular instance represent that dimension being absent for that instance, i.e. the metric is agnostic of that dimension.
  • A timestamp recording when the metric was calculated to enable metrics with the same “key” (equivalent normalization and dimensional fields) to be monitored over time
  • A job id recording some identifier for the process that calculated the metric, to allow grouping together of calculations created in the same process

For example, a recidivism rate metric might look like this:

Retrieving output

Because metrics are written back into BigQuery, retrieving specific metrics is a matter of querying from the correct metric table with the desired columns, i.e. required normalization settings and desired dimensional fields based on your desired view of the data, and optional time or job id filters to look at results from a specific point in time. This grants the ability to retrieve and filter output in a variety of ways. Some examples:

  • Retrieve the most recent baseline metric, e.g. 5-year recidivism rates for the 2014 cohort. Note that this requires specifying each optional dimension you do not want a breakdown for as explicitly null
    • select * from project-id.dataset.recidivism_rate where job_id='most-recent-id' and methodology='PERSON' and release_cohort=2014 and follow_up_period=5 and state_code='US_XX' and age_bucket is null and stay_length_bucket is null and race is null and ...
  • Track how that baseline metric has changed over time
    • select * from project-id.dataset.recidivism_rate where methodology='PERSON' and release_cohort=2014 and follow_up_period=5 and state_code='US_XX' and age_bucket is null and stay_length_bucket is null and race is null and ... order by created_on desc
  • Filter that baseline metric by arbitrary dimensions, e.g. the baseline recidivism rate metric specifically for black women between 25-29 years old who were released from a particular prison
    • select * from project-id.dataset.recidivism_rate where job_id='most-recent-id' and methodology='PERSON' and release_cohort=2014 and follow_up_period=5 and state_code='US_XX' and age_bucket='25-29' and stay_length_bucket is null and race='BLACK' and gender='FEMALE' and release_facility='some-facility' and...
  • Break down that baseline metric by some combination of dimensions, e.g. the baseline recidivism rate metric for every combination of race and gender
    • select * from project-id.dataset.recidivism_rate where job_id='most-recent-id' and methodology='PERSON' and release_cohort=2014 and follow_up_period=5 and state_code='US_XX' and age_bucket is null and stay_length_bucket is null and race is not null and gender is not null and...
  • Compare along different normalization parameters, e.g. the baseline recidivism rate metric for different follow up periods and different states
    • select * from project-id.dataset.recidivism_rate where job_id='most-recent-id' and methodology='PERSON' and release_cohort=2014 and follow_up_period is not null and state_code is not null and age_bucket is null and stay_length_bucket is null and race is null and ...

Many more kinds of queries can be structured to slice and dice batch processing output. The combination of full dimensionality pre-calculation and structured post-processing querying has powerful expressiveness.